Merge branch 'main' into cn/more-database-states

pull/24376/head
kodiakhq[bot] 2021-09-15 19:44:11 +00:00 committed by GitHub
commit 957df68da3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 577 additions and 3549 deletions

40
Cargo.lock generated
View File

@ -836,16 +836,6 @@ dependencies = [
"sct",
]
[[package]]
name = "dashmap"
version = "4.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c"
dependencies = [
"cfg-if",
"num_cpus",
]
[[package]]
name = "data_types"
version = "0.1.0"
@ -1729,6 +1719,7 @@ dependencies = [
"flate2",
"futures",
"generated_types",
"hashbrown",
"heappy",
"hex",
"http",
@ -1745,7 +1736,6 @@ dependencies = [
"logfmt",
"metric",
"metric_exporters",
"metrics",
"mutable_buffer",
"num_cpus",
"object_store",
@ -2199,20 +2189,6 @@ dependencies = [
"test_helpers",
]
[[package]]
name = "metrics"
version = "0.1.0"
dependencies = [
"dashmap",
"hashbrown",
"observability_deps",
"opentelemetry",
"opentelemetry-prometheus",
"parking_lot",
"prometheus",
"snafu",
]
[[package]]
name = "mime"
version = "0.3.16"
@ -2713,8 +2689,6 @@ checksum = "e1cf9b1c4e9a6c4de793c632496fa490bdc0e1eea73f0c91394f7b6990935d22"
dependencies = [
"async-trait",
"crossbeam-channel",
"dashmap",
"fnv",
"futures",
"js-sys",
"lazy_static",
@ -2741,17 +2715,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "opentelemetry-prometheus"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ee9c06c1366665e7d4dba6540a42ea48900a9c92dc5b963f3ae05fbba76dc63"
dependencies = [
"opentelemetry",
"prometheus",
"protobuf",
]
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.8.0"
@ -4123,7 +4086,6 @@ dependencies = [
"itertools 0.10.1",
"lifecycle",
"metric",
"metrics",
"mutable_buffer",
"num_cpus",
"object_store",

View File

@ -95,7 +95,6 @@ iox_object_store = { path = "iox_object_store" }
logfmt = { path = "logfmt" }
metric = { path = "metric" }
metric_exporters = { path = "metric_exporters" }
metrics = { path = "metrics" }
mutable_buffer = { path = "mutable_buffer" }
num_cpus = "1.13.0"
object_store = { path = "object_store" }
@ -125,6 +124,7 @@ dirs = "3.0.1"
dotenv = "0.15.0"
flate2 = "1.0"
futures = "0.3"
hashbrown = "0.11"
http = "0.2.0"
hyper = "0.14"
libc = { version = "0.2" }

View File

@ -5,7 +5,6 @@ use std::{
collections::HashMap,
hash::{Hash, Hasher},
num::{NonZeroU32, NonZeroUsize},
sync::Arc,
};
use chrono::{TimeZone, Utc};
@ -444,7 +443,7 @@ pub struct ShardConfig {
pub ignore_errors: bool,
/// Mapping between shard IDs and node groups. Other sharding rules use
/// ShardId as targets.
pub shards: Arc<HashMap<ShardId, Sink>>,
pub shards: HashMap<ShardId, Sink>,
}
/// Configuration for a specific IOx sink

View File

@ -1,5 +1,4 @@
use std::convert::{TryFrom, TryInto};
use std::sync::Arc;
use regex::Regex;
@ -45,14 +44,12 @@ impl TryFrom<management::ShardConfig> for ShardConfig {
.map_or(Ok(None), |r| r.map(Some))
.field("hash_ring")?,
ignore_errors: proto.ignore_errors,
shards: Arc::new(
proto
.shards
.into_iter()
.map(|(k, v)| Ok((k, v.try_into()?)))
.collect::<Result<_, FieldViolation>>()
.field("shards")?,
),
shards: proto
.shards
.into_iter()
.map(|(k, v)| Ok((k, v.try_into()?)))
.collect::<Result<_, FieldViolation>>()
.field("shards")?,
})
}
}

View File

@ -1,26 +0,0 @@
[package]
name = "metrics"
version = "0.1.0"
authors = ["Edd Robinson <me@edd.io>"]
edition = "2018"
# This crate contains the application-specific abstraction of the metrics
# implementation for IOx. Any crate that wants to expose telemetry should use
# this crate.
#
# The crate exposes a special type of metrics registry which allows users to
# isolate their metrics from the rest of the system so they can be tested
# in isolation.
[dependencies] # In alphabetical order
dashmap = { version = "4.0.1" }
hashbrown = "0.11"
observability_deps = { path = "../observability_deps" }
opentelemetry = "0.16"
opentelemetry-prometheus = "0.9"
parking_lot = "0.11.1"
prometheus = "0.12"
snafu = "0.6"
[dev-dependencies] # In alphabetical order

View File

@ -1,293 +0,0 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use dashmap::DashMap;
use hashbrown::HashMap;
use crate::KeyValue;
use hashbrown::hash_map::RawEntryMut;
use opentelemetry::attributes::{AttributeSet, DefaultAttributeEncoder};
/// A `Gauge` allows tracking multiple usize values by attribute set
///
/// Metrics can be recorded directly on the Gauge or when the attributes are
/// known ahead of time a `GaugeValue` can be obtained with `Gauge::gauge_value`
///
/// When a `Gauge` is dropped any contributions it made to any attribute sets
/// will be deducted
#[derive(Debug)]
pub struct Gauge {
/// `GaugeState` stores the underlying state for this `Gauge`
state: Arc<GaugeState>,
/// Any contributions made by this instance of `Gauge` to the totals
/// stored in `GaugeState`
values: HashMap<String, GaugeValue>,
default_attributes: Vec<KeyValue>,
}
impl Gauge {
/// Creates a new Gauge that isn't registered with and consequently
/// won't report to any metrics registry
///
/// Observations made to this Gauge, and any GaugeValues it creates,
/// will still be computed correctly and visible but will not be
/// reported to a central metric registry, and will not be visible
/// to any other Gauge instance
pub fn new_unregistered() -> Self {
Self {
state: Arc::new(Default::default()),
values: Default::default(),
default_attributes: vec![],
}
}
pub(crate) fn new(state: Arc<GaugeState>, default_attributes: Vec<KeyValue>) -> Self {
Self {
values: Default::default(),
state,
default_attributes,
}
}
/// Gets a `GaugeValue` for a given set of attributes
/// This allows fast value updates and retrieval when the attributes are known in advance
pub fn gauge_value(&self, attributes: &[KeyValue]) -> GaugeValue {
let (encoded, keys) = self.encode_attributes(attributes);
self.state.gauge_value_encoded(encoded, keys)
}
pub fn inc(&mut self, delta: usize, attributes: &[KeyValue]) {
self.call(attributes, |observer| observer.inc(delta))
}
pub fn decr(&mut self, delta: usize, attributes: &[KeyValue]) {
self.call(attributes, |observer| observer.decr(delta))
}
pub fn set(&mut self, value: usize, attributes: &[KeyValue]) {
self.call(attributes, |observer| observer.set(value))
}
fn encode_attributes(&self, attributes: &[KeyValue]) -> (String, AttributeSet) {
GaugeState::encode_attributes(self.default_attributes.iter().chain(attributes).cloned())
}
fn call(&mut self, attributes: &[KeyValue], f: impl Fn(&mut GaugeValue)) {
let (encoded, keys) = self.encode_attributes(attributes);
match self.values.raw_entry_mut().from_key(&encoded) {
RawEntryMut::Occupied(mut occupied) => f(occupied.get_mut()),
RawEntryMut::Vacant(vacant) => {
let (_, observer) = vacant.insert(
encoded.clone(),
self.state.gauge_value_encoded(encoded, keys),
);
f(observer)
}
}
}
}
/// The shared state for a `Gauge`
#[derive(Debug, Default)]
pub(crate) struct GaugeState {
/// Keep tracks of every attribute set observed by this gauge
///
/// The key is a sorted encoding of this attribute set, that reconciles
/// duplicates in the same manner as OpenTelemetry (it uses the same encoder)
/// which appears to be last-writer-wins
values: DashMap<String, (GaugeValue, Vec<KeyValue>)>,
}
impl GaugeState {
/// Visits the totals for all recorded attribute sets
pub fn visit_values(&self, f: impl Fn(usize, &[KeyValue])) {
for data in self.values.iter() {
let (data, attributes) = data.value();
f(data.get_total(), attributes)
}
}
fn encode_attributes(attributes: impl IntoIterator<Item = KeyValue>) -> (String, AttributeSet) {
let set = AttributeSet::from_attributes(attributes);
let encoded = set.encoded(Some(&DefaultAttributeEncoder));
(encoded, set)
}
fn gauge_value_encoded(&self, encoded: String, keys: AttributeSet) -> GaugeValue {
self.values
.entry(encoded)
.or_insert_with(|| {
(
// It is created unregistered, adding it to values "registers" it
GaugeValue::new_unregistered(),
keys.iter()
.map(|(key, value)| KeyValue::new(key.clone(), value.clone()))
.collect(),
)
})
.value()
.0
.clone_empty()
}
}
#[derive(Debug, Default)]
struct GaugeValueShared {
/// The total contribution from all associated `GaugeValue`s
total: AtomicUsize,
}
/// A `GaugeValue` is a single measurement associated with a `Gauge`
///
/// When the `GaugeValue` is dropped any contribution it made to the total
/// will be deducted from the `GaugeValue` total
///
/// Each `GaugeValue` stores a reference to `GaugeValueShared`. The construction
/// of `Gauge` ensures that all registered `GaugeValue`s for the same attribute set
/// refer to the same `GaugeValueShared` that isn't shared with any others
///
/// Each `GaugeValue` also stores its local observation, when updated it also
/// updates the total in `GaugeValueShared` by the same amount. When dropped it
/// removes any contribution it made to the total in `GaugeValueShared`
///
#[derive(Debug)]
pub struct GaugeValue {
/// A reference to the shared pool
shared: Arc<GaugeValueShared>,
/// The locally observed value
local: usize,
}
impl GaugeValue {
/// Creates a new GaugeValue that isn't associated with any Gauge
///
/// Observations made to this GaugeValue, and any GaugeValue it creates,
/// will still be computed correctly and visible but will not be reported
/// to a Gauge, nor a metric registry, or any independently created GaugeValue
pub fn new_unregistered() -> Self {
Self {
shared: Arc::new(Default::default()),
local: 0,
}
}
/// Creates a new GaugeValue with no local observation that refers to the same
/// underlying backing store as this GaugeValue
pub fn clone_empty(&self) -> Self {
Self {
shared: Arc::clone(&self.shared),
local: 0,
}
}
/// Gets the total value from across all `GaugeValue` associated
/// with this `Gauge`
pub fn get_total(&self) -> usize {
self.shared.total.load(Ordering::Relaxed)
}
/// Gets the local contribution from this instance
pub fn get_local(&self) -> usize {
self.local
}
/// Increment the local value for this GaugeValue
pub fn inc(&mut self, delta: usize) {
self.local += delta;
self.shared.total.fetch_add(delta, Ordering::Relaxed);
}
/// Decrement the local value for this GaugeValue
pub fn decr(&mut self, delta: usize) {
self.local -= delta;
self.shared.total.fetch_sub(delta, Ordering::Relaxed);
}
/// Sets the local value for this GaugeValue
pub fn set(&mut self, new: usize) {
match new.cmp(&self.local) {
std::cmp::Ordering::Less => self.decr(self.local - new),
std::cmp::Ordering::Equal => {}
std::cmp::Ordering::Greater => self.inc(new - self.local),
}
}
}
impl Drop for GaugeValue {
fn drop(&mut self) {
self.shared.total.fetch_sub(self.local, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tracker() {
let start = GaugeValue::new_unregistered();
let mut t1 = start.clone_empty();
let mut t2 = start.clone_empty();
t1.set(200);
assert_eq!(t1.get_total(), 200);
assert_eq!(t2.get_total(), 200);
assert_eq!(start.get_total(), 200);
t1.set(100);
assert_eq!(t1.get_total(), 100);
assert_eq!(t2.get_total(), 100);
assert_eq!(start.get_total(), 100);
t2.set(300);
assert_eq!(t1.get_total(), 400);
assert_eq!(t2.get_total(), 400);
assert_eq!(start.get_total(), 400);
t2.set(400);
assert_eq!(t1.get_total(), 500);
assert_eq!(t2.get_total(), 500);
assert_eq!(start.get_total(), 500);
std::mem::drop(t2);
assert_eq!(t1.get_total(), 100);
assert_eq!(start.get_total(), 100);
std::mem::drop(t1);
assert_eq!(start.get_total(), 0);
}
#[test]
fn test_mixed() {
let gauge_state = Arc::new(GaugeState::default());
let mut gauge = Gauge::new(Arc::clone(&gauge_state), vec![KeyValue::new("foo", "bar")]);
let mut gauge2 = Gauge::new(gauge_state, vec![KeyValue::new("foo", "bar")]);
gauge.set(32, &[KeyValue::new("bingo", "bongo")]);
gauge2.set(64, &[KeyValue::new("bingo", "bongo")]);
let mut gauge_value = gauge.gauge_value(&[KeyValue::new("bingo", "bongo")]);
gauge_value.set(12);
assert_eq!(gauge_value.get_local(), 12);
assert_eq!(gauge_value.get_total(), 32 + 64 + 12);
std::mem::drop(gauge2);
assert_eq!(gauge_value.get_total(), 32 + 12);
gauge.inc(5, &[KeyValue::new("no", "match")]);
assert_eq!(gauge_value.get_total(), 32 + 12);
gauge.inc(5, &[KeyValue::new("bingo", "bongo")]);
assert_eq!(gauge_value.get_total(), 32 + 12 + 5);
std::mem::drop(gauge);
assert_eq!(gauge_value.get_total(), 12);
assert_eq!(gauge_value.get_local(), 12);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,409 +0,0 @@
use std::{
borrow::Cow,
fmt::Display,
time::{Duration, Instant},
};
use opentelemetry::metrics::{Counter as OTCounter, ValueRecorder as OTHistogram};
pub use opentelemetry::KeyValue;
const RED_REQUEST_STATUS_ATTRIBUTE: &str = "status";
/// Possible types of RED metric observation status.
///
/// Ok - an observed request was successful.
/// ClientError - an observed request was unsuccessful but it was not the fault
/// of the observed service.
/// Error - an observed request failed and it was the fault of the
/// service.
///
/// What is the difference between `ClientError` and `Error`? The difference is
/// to do where the failure occurred. When thinking about measuring SLOs like
/// availability it's necessary to calculate things like:
///
/// Availability = 1 - (failed_requests / all_valid_requests)
///
/// `all_valid_requests` includes successful requests and any requests that
/// failed but not due to the fault of the service (e.g., client errors).
///
/// It is useful to track the components of `all_valid_requests` separately so
/// operators can also monitor external errors (client_error) errors to help
/// improve their APIs or other systems.
#[derive(Debug)]
pub enum RedRequestStatus {
Ok,
ClientError,
Error,
}
impl RedRequestStatus {
fn as_str(&self) -> &'static str {
match self {
Self::Ok => "ok",
Self::ClientError => "client_error",
Self::Error => "error",
}
}
}
impl Display for RedRequestStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
/// A REDMetric is a metric that tracks requests to some resource.
///
/// The [RED methodology](https://www.weave.works/blog/the-red-method-key-metrics-for-microservices-architecture/)
/// stipulates you should track three key measures:
///
/// - Request Rate: (total number of requests / second);
/// - Error Rate: (total number of failed requests / second);
/// - Duration: (latency distributions for the various requests)
///
/// Using a `REDMetric` makes following this methodology easy because it handles
/// updating the three components for you.
pub struct RedMetric {
requests: OTCounter<u64>,
duration: OTHistogram<f64>,
default_attributes: Vec<KeyValue>,
}
/// Workaround self-recursive OT instruments
/// <https://github.com/open-telemetry/opentelemetry-rust/issues/550>
impl std::fmt::Debug for RedMetric {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedMetric")
.field("default_attributes", &self.default_attributes)
.finish()
}
}
impl RedMetric {
pub(crate) fn new(
requests: OTCounter<u64>,
duration: OTHistogram<f64>,
mut default_attributes: Vec<KeyValue>,
) -> Self {
// TODO(edd): decide what to do if `attributes` contains
// RED_REQUEST_STATUS_ATTRIBUTE.
// RedMetric always has a status attribute.
default_attributes.insert(
0,
KeyValue::new(RED_REQUEST_STATUS_ATTRIBUTE, RedRequestStatus::Ok.as_str()),
);
Self {
requests,
duration,
default_attributes,
}
}
/// Returns a new observation that will handle timing and recording an
/// observation the metric is tracking.
pub fn observation(
&self,
) -> RedObservation<impl Fn(RedRequestStatus, Duration, &[KeyValue]) + '_> {
// The recording call-back
let record =
move |status: RedRequestStatus, duration: Duration, attributes: &[KeyValue]| {
let attributes = if attributes.is_empty() {
// If there are no attributes specified just borrow defaults
Cow::Borrowed(&self.default_attributes)
} else {
// Otherwise merge the provided attributes and the defaults.
// Note: provided attributes need to go last so that they overwrite
// any default attributes.
//
// PERF(edd): this seems expensive to me.
let mut new_attributes: Vec<KeyValue> = self.default_attributes.clone();
new_attributes.extend_from_slice(attributes);
Cow::Owned(new_attributes)
};
match status {
RedRequestStatus::Ok => {
self.requests.add(1, &attributes);
self.duration.record(duration.as_secs_f64(), &attributes);
}
RedRequestStatus::ClientError => {
let mut attributes = attributes.into_owned();
attributes[0] = KeyValue::new(
RED_REQUEST_STATUS_ATTRIBUTE,
RedRequestStatus::ClientError.as_str(),
);
self.requests.add(1, &attributes);
self.duration.record(duration.as_secs_f64(), &attributes);
}
RedRequestStatus::Error => {
let mut attributes = attributes.into_owned();
attributes[0] = KeyValue::new(
RED_REQUEST_STATUS_ATTRIBUTE,
RedRequestStatus::Error.as_str(),
);
self.requests.add(1, &attributes);
self.duration.record(duration.as_secs_f64(), &attributes);
}
};
};
RedObservation::new(record)
}
}
#[derive(Debug, Clone)]
pub struct RedObservation<T>
where
T: Fn(RedRequestStatus, Duration, &[KeyValue]),
{
start: Instant,
record: T, // a call-back that records the observation on the metric.
}
impl<T> RedObservation<T>
where
T: Fn(RedRequestStatus, Duration, &[KeyValue]),
{
pub(crate) fn new(record: T) -> Self {
Self {
start: std::time::Instant::now(),
record,
}
}
/// Record that an observation was successful. The duration of the
/// observation should be provided. Callers might prefer `ok` where the
/// timing will be handled for them.
pub fn observe(
&self,
observation: RedRequestStatus,
duration: Duration,
attributes: &[KeyValue],
) {
(self.record)(observation, duration, attributes);
}
/// Record that the observation was successful. Timing of observation is
/// handled automatically.
pub fn ok(&self) {
self.ok_with_attributes(&[])
}
/// Record that the observation was successful with provided attributes.
/// Timing of observation is handled automatically.
pub fn ok_with_attributes(&self, attributes: &[KeyValue]) {
let duration = self.start.elapsed();
self.observe(RedRequestStatus::Ok, duration, attributes);
}
/// Record that the observation was not successful but was still valid.
/// `client_error` is the right thing to choose when the request failed perhaps
/// due to client error. Timing of observation is handled automatically.
pub fn client_error(&self) {
self.client_error_with_attributes(&[])
}
/// Record with attributes that the observation was not successful but was still
/// valid. `client_error` is the right thing to choose when the request failed
/// perhaps due to client error. Timing of observation is handled
/// automatically.
pub fn client_error_with_attributes(&self, attributes: &[KeyValue]) {
let duration = self.start.elapsed();
self.observe(RedRequestStatus::ClientError, duration, attributes);
}
/// Record that the observation was not successful and results in an error
/// caused by the service under observation. Timing of observation is
/// handled automatically.
pub fn error(&self) {
self.error_with_attributes(&[]);
}
/// Record with attributes that the observation was not successful and results
/// in an error caused by the service under observation. Timing of
/// observation is handled automatically.
pub fn error_with_attributes(&self, attributes: &[KeyValue]) {
let duration = self.start.elapsed();
self.observe(RedRequestStatus::Error, duration, attributes);
}
}
/// A Counter is a metric exposing a monotonically increasing counter.
/// It is best used to track increases in something over time.
///
/// If you want to track some notion of success, failure and latency consider
/// using a `REDMetric` instead rather than expressing that with attributes on a
/// `Counter`.
#[derive(Clone)]
pub struct Counter {
counter: Option<OTCounter<u64>>,
default_attributes: Vec<KeyValue>,
}
/// Workaround self-recursive OT instruments
/// <https://github.com/open-telemetry/opentelemetry-rust/issues/550>
impl std::fmt::Debug for Counter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Counter")
.field("default_attributes", &self.default_attributes)
.finish()
}
}
impl Counter {
/// Creates a new Counter that isn't registered with and
/// consequently won't report to any metrics registry
pub fn new_unregistered() -> Self {
Self {
counter: None,
default_attributes: vec![],
}
}
pub(crate) fn new(counter: OTCounter<u64>, default_attributes: Vec<KeyValue>) -> Self {
Self {
counter: Some(counter),
default_attributes,
}
}
// Increase the count by `value`.
pub fn add(&self, value: u64) {
self.add_with_attributes(value, &[]);
}
/// Increase the count by `value` and associate the observation with the
/// provided attributes.
pub fn add_with_attributes(&self, value: u64, attributes: &[KeyValue]) {
let counter = match self.counter.as_ref() {
Some(counter) => counter,
None => return,
};
let attributes = match attributes.is_empty() {
// If there are no attributes specified just borrow defaults
true => Cow::Borrowed(&self.default_attributes),
false => {
// Otherwise merge the provided attributes and the defaults.
// Note: provided attributes need to go last so that they overwrite
// any default attributes.
//
// PERF(edd): this seems expensive to me.
let mut new_attributes: Vec<KeyValue> = self.default_attributes.clone();
new_attributes.extend(attributes.iter().cloned());
Cow::Owned(new_attributes)
}
};
counter.add(value, &attributes);
}
// Increase the count by 1.
pub fn inc(&self) {
self.add_with_attributes(1, &[]);
}
/// Increase the count by 1 and associate the observation with the provided
/// attributes.
pub fn inc_with_attributes(&self, attributes: &[KeyValue]) {
self.add_with_attributes(1, attributes)
}
}
/// A Histogram is a metric exposing a distribution of observations.
#[derive(Clone)]
pub struct Histogram {
histogram: Option<OTHistogram<f64>>,
default_attributes: Vec<KeyValue>,
}
/// Workaround self-recursive OT instruments
/// <https://github.com/open-telemetry/opentelemetry-rust/issues/550>
impl std::fmt::Debug for Histogram {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Histogram")
.field("default_attributes", &self.default_attributes)
.finish()
}
}
impl Histogram {
/// Creates a new Histogram that isn't registered with and
/// consequently won't report to any metrics registry
pub fn new_unregistered() -> Self {
Self {
histogram: None,
default_attributes: vec![],
}
}
pub(crate) fn new(histogram: OTHistogram<f64>, default_attributes: Vec<KeyValue>) -> Self {
Self {
histogram: Some(histogram),
default_attributes,
}
}
/// Add a new observation to the histogram including the provided attributes.
pub fn observe_with_attributes(&self, observation: f64, attributes: &[KeyValue]) {
let histogram = match self.histogram.as_ref() {
Some(histogram) => histogram,
None => return,
};
// merge attributes
let attributes = if attributes.is_empty() {
// If there are no attributes specified just borrow defaults
Cow::Borrowed(&self.default_attributes)
} else {
// Otherwise merge the provided attributes and the defaults.
// Note: provided attributes need to go last so that they overwrite
// any default attributes.
//
// PERF(edd): this seems expensive to me.
let mut new_attributes: Vec<KeyValue> = self.default_attributes.clone();
new_attributes.extend_from_slice(attributes);
Cow::Owned(new_attributes)
};
histogram.record(observation, &attributes);
}
/// Add a new observation to the histogram
pub fn observe(&self, observation: f64) {
self.observe_with_attributes(observation, &[]);
}
/// A helper method for observing latencies. Returns a new timing instrument
/// which will handle submitting an observation containing a duration.
pub fn timer(&self) -> HistogramTimer<'_> {
HistogramTimer::new(self)
}
}
#[derive(Debug)]
pub struct HistogramTimer<'a> {
start: Instant,
histogram: &'a Histogram,
}
impl<'a> HistogramTimer<'a> {
pub fn new(histogram: &'a Histogram) -> Self {
Self {
start: Instant::now(),
histogram,
}
}
pub fn record(self) {
self.record_with_attributes(&[]);
}
pub fn record_with_attributes(self, attributes: &[KeyValue]) {
self.histogram
.observe_with_attributes(self.start.elapsed().as_secs_f64(), attributes);
}
}

View File

@ -1,173 +0,0 @@
//! This module is a gnarly hack around the fact opentelemetry doesn't currently let you
//! register an observer multiple times with the same name
//!
//! See <https://github.com/open-telemetry/opentelemetry-rust/issues/541>
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::Mutex;
use opentelemetry::metrics::{Meter, ObserverResult};
type CallbackFunc<T> = Box<dyn Fn(&ObserverResult<T>) + Send + Sync + 'static>;
#[derive(Clone)]
struct CallbackCollection<T>(Arc<Mutex<Vec<CallbackFunc<T>>>>);
impl<T> CallbackCollection<T> {
fn new<F>(f: F) -> Self
where
F: Fn(&ObserverResult<T>) + Send + Sync + 'static,
{
Self(Arc::new(Mutex::new(vec![Box::new(f)])))
}
fn push<F>(&self, f: F)
where
F: Fn(&ObserverResult<T>) + Send + Sync + 'static,
{
self.0.lock().push(Box::new(f))
}
fn invoke(&self, observer: ObserverResult<T>) {
let callbacks = self.0.lock();
for callback in callbacks.iter() {
callback(&observer)
}
}
}
enum Callbacks {
U64Value(CallbackCollection<u64>),
U64Sum(CallbackCollection<u64>),
F64Value(CallbackCollection<f64>),
F64Sum(CallbackCollection<f64>),
}
impl std::fmt::Debug for Callbacks {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self {
Self::U64Value(_) => write!(f, "U64Value"),
Self::U64Sum(_) => write!(f, "U64Sum"),
Self::F64Value(_) => write!(f, "F64Value"),
Self::F64Sum(_) => write!(f, "F64Sum"),
}
}
}
#[derive(Debug)]
pub struct ObserverCollection {
callbacks: Mutex<HashMap<String, Callbacks>>,
meter: Meter,
}
impl ObserverCollection {
pub fn new(meter: Meter) -> Self {
Self {
callbacks: Default::default(),
meter,
}
}
pub fn u64_value_observer<F>(
&self,
name: impl Into<String>,
description: impl Into<String>,
callback: F,
) where
F: Fn(&ObserverResult<u64>) + Send + Sync + 'static,
{
match self.callbacks.lock().entry(name.into()) {
Entry::Occupied(occupied) => match occupied.get() {
Callbacks::U64Value(callbacks) => callbacks.push(Box::new(callback)),
c => panic!("metric type mismatch, expected U64Value got {:?}", c),
},
Entry::Vacant(vacant) => {
let name = vacant.key().clone();
let callbacks = CallbackCollection::new(callback);
vacant.insert(Callbacks::U64Value(callbacks.clone()));
self.meter
.u64_value_observer(name, move |observer| callbacks.invoke(observer))
.with_description(description)
.init();
}
}
}
pub fn u64_sum_observer<F>(
&self,
name: impl Into<String>,
description: impl Into<String>,
callback: F,
) where
F: Fn(&ObserverResult<u64>) + Send + Sync + 'static,
{
match self.callbacks.lock().entry(name.into()) {
Entry::Occupied(occupied) => match occupied.get() {
Callbacks::U64Sum(callbacks) => callbacks.push(Box::new(callback)),
c => panic!("metric type mismatch, expected U64Sum got {:?}", c),
},
Entry::Vacant(vacant) => {
let name = vacant.key().clone();
let callbacks = CallbackCollection::new(callback);
vacant.insert(Callbacks::U64Sum(callbacks.clone()));
self.meter
.u64_sum_observer(name, move |observer| callbacks.invoke(observer))
.with_description(description)
.init();
}
}
}
pub fn f64_value_observer<F>(
&self,
name: impl Into<String>,
description: impl Into<String>,
callback: F,
) where
F: Fn(&ObserverResult<f64>) + Send + Sync + 'static,
{
match self.callbacks.lock().entry(name.into()) {
Entry::Occupied(occupied) => match occupied.get() {
Callbacks::F64Value(callbacks) => callbacks.push(Box::new(callback)),
c => panic!("metric type mismatch, expected F64Value got {:?}", c),
},
Entry::Vacant(vacant) => {
let name = vacant.key().clone();
let callbacks = CallbackCollection::new(callback);
vacant.insert(Callbacks::F64Value(callbacks.clone()));
self.meter
.f64_value_observer(name, move |observer| callbacks.invoke(observer))
.with_description(description)
.init();
}
}
}
pub fn f64_sum_observer<F>(
&self,
name: impl Into<String>,
description: impl Into<String>,
callback: F,
) where
F: Fn(&ObserverResult<f64>) + Send + Sync + 'static,
{
match self.callbacks.lock().entry(name.into()) {
Entry::Occupied(occupied) => match occupied.get() {
Callbacks::F64Sum(callbacks) => callbacks.push(Box::new(callback)),
c => panic!("metric type mismatch, expected F64Sum got {:?}", c),
},
Entry::Vacant(vacant) => {
let name = vacant.key().clone();
let callbacks = CallbackCollection::new(callback);
vacant.insert(Callbacks::F64Sum(callbacks.clone()));
self.meter
.f64_sum_observer(name, move |observer| callbacks.invoke(observer))
.with_description(description)
.init();
}
}
}
}

View File

@ -1,680 +0,0 @@
use std::{env, fmt, sync::Arc};
use snafu::{ensure, OptionExt, Snafu};
use prometheus::proto::{
Counter as PromCounter, Gauge as PromGauge, Histogram as PromHistogram, MetricFamily,
};
use crate::MetricRegistry;
struct OptInString(String);
impl fmt::Display for OptInString {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}
impl fmt::Debug for OptInString {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if env::var_os("METRICS_DETAILED_OUTPUT").is_some() {
fmt::Display::fmt(&self.0, f)
} else {
"Output disabled; set the METRICS_DETAILED_OUTPUT environment variable to see it".fmt(f)
}
}
}
impl From<String> for OptInString {
fn from(other: String) -> Self {
Self(other)
}
}
impl From<&String> for OptInString {
fn from(other: &String) -> Self {
Self(other.clone())
}
}
#[derive(Debug, Snafu)]
enum InnerError {
#[snafu(display("no metric family with name: {}\n{}", name, metrics))]
MetricFamilyNotFound { name: String, metrics: String },
#[snafu(display(
"attributes {:?} do not match metric: {}\n{}",
attributes,
name,
metrics
))]
NoMatchingAttributes {
attributes: Vec<(String, String)>,
name: String,
metrics: OptInString,
},
#[snafu(display("bucket {:?} is not in metric family: {}\n{}", bound, name, metrics))]
HistogramBucketNotFound {
bound: f64,
name: String,
metrics: OptInString,
},
#[snafu(display("metric '{}' failed assertion: '{}'\n{}", name, msg, metrics))]
FailedMetricAssertion {
name: String,
msg: String,
metrics: OptInString,
},
}
#[derive(Snafu)]
pub struct Error(InnerError);
impl fmt::Debug for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:#?}", self.0)
}
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A metric registry with handy helpers for asserting conditions on metrics.
///
/// You can either initialise a default `TestMetricRegistry` and use the
/// `registry()` method to inject a `MetricRegistry` wherever you need it.
/// Alternatively you can provide an existing `MetricRegistry` via `new`.
///
///
/// The main purpose of the `TestRegistry` is to provide a simple API to assert
/// that metrics exist and have certain values.
///
/// Please see the test cases at the top of this crate for example usage.
#[derive(Debug, Default)]
pub struct TestMetricRegistry {
registry: Arc<MetricRegistry>,
}
impl TestMetricRegistry {
pub fn new(registry: Arc<MetricRegistry>) -> Self {
Self { registry }
}
pub fn registry(&self) -> Arc<MetricRegistry> {
Arc::clone(&self.registry)
}
/// Returns an assertion builder for the specified metric name or an error
/// if one doesn't exist.
///
/// Note: Prometheus histograms comprise multiple metrics sharing the same
/// family. Use the family name, e.g., `http_request_duration_seconds` to
/// get access to the individual metrics via the `histogram` method on the
/// returned `AssertionBuilder`.
pub fn try_has_metric_family<'a>(&'a self, name: &str) -> Result<AssertionBuilder<'a>, Error> {
let metric_families = self.registry.exporter.registry().gather();
let family = metric_families
.into_iter()
.find(|fam| fam.get_name() == name)
.context(MetricFamilyNotFound {
name,
metrics: self.registry.metrics_as_str(),
})?;
Ok(AssertionBuilder::new(family, &self.registry))
}
/// Returns an assertion builder for the specified metric name.
///
/// # Panics
///
/// Panics if no metric family has `name`. To avoid a panic see
/// `try_has_metric`.
pub fn has_metric_family<'a>(&'a self, name: &str) -> AssertionBuilder<'a> {
self.try_has_metric_family(name).unwrap()
}
}
#[derive(Debug)]
pub struct AssertionBuilder<'a> {
family: MetricFamily,
attributes: Vec<(String, String)>,
registry: &'a MetricRegistry,
}
impl<'a> AssertionBuilder<'a> {
fn new(family: MetricFamily, registry: &'a MetricRegistry) -> Self {
Self {
family,
attributes: vec![],
registry,
}
}
/// Assert that the metric has the following set of attributes.
pub fn with_attributes(mut self, attributes: &[(&str, &str)]) -> Self {
for (key, value) in attributes {
self.attributes.push((key.to_string(), value.to_string()));
}
self
}
/// Returns the counter metric, allowing assertions to be applied.
pub fn counter(&mut self) -> CounterAssertion<'_> {
// sort the assertion's attributes
self.attributes.sort_by(|a, b| a.0.cmp(&b.0));
let metric = self.family.get_metric().iter().find(|metric| {
if metric.get_label().len() != self.attributes.len() {
return false; // this metric can't match
}
// sort this metrics attributes and compare to assertion attributes
let mut metric_attributes = metric.get_label().to_vec();
metric_attributes.sort_by(|a, b| a.get_name().cmp(b.get_name()));
// metric only matches if all attributes are identical.
metric_attributes
.iter()
.zip(self.attributes.iter())
.all(|(a, b)| a.get_name() == b.0 && a.get_value() == b.1)
});
// Can't find metric matching attributes
if metric.is_none() {
return CounterAssertion {
c: NoMatchingAttributes {
name: self.family.get_name().to_owned(),
attributes: self.attributes.clone(),
metrics: self.registry.metrics_as_str(),
}
.fail()
.map_err(Into::into),
family_name: "".to_string(),
metric_dump: "".to_string(),
};
}
let metric = metric.unwrap();
if !metric.has_counter() {
return CounterAssertion {
c: FailedMetricAssertion {
name: self.family.get_name().to_owned(),
msg: "metric not a counter".to_owned(),
metrics: self.registry.metrics_as_str(),
}
.fail()
.map_err(Into::into),
family_name: "".to_string(),
metric_dump: "".to_string(),
};
}
CounterAssertion {
c: Ok(metric.get_counter()),
family_name: self.family.get_name().to_owned(),
metric_dump: self.registry.metrics_as_str(),
}
}
/// Returns the gauge metric, allowing assertions to be applied.
pub fn gauge(&mut self) -> GaugeAssertion<'_> {
// sort the assertion's attributes
self.attributes.sort_by(|a, b| a.0.cmp(&b.0));
let metric = self.family.get_metric().iter().find(|metric| {
if metric.get_label().len() != self.attributes.len() {
return false; // this metric can't match
}
// sort this metrics attributes and compare to assertion attributes
let mut metric_attributes = metric.get_label().to_vec();
metric_attributes.sort_by(|a, b| a.get_name().cmp(b.get_name()));
// metric only matches if all attributes are identical.
metric_attributes
.iter()
.zip(self.attributes.iter())
.all(|(a, b)| a.get_name() == b.0 && a.get_value() == b.1)
});
// Can't find metric matching attributes
if metric.is_none() {
return GaugeAssertion {
c: NoMatchingAttributes {
name: self.family.get_name().to_owned(),
attributes: self.attributes.clone(),
metrics: self.registry.metrics_as_str(),
}
.fail()
.map_err(Into::into),
family_name: "".to_string(),
metric_dump: "".to_string(),
};
}
let metric = metric.unwrap();
if !metric.has_gauge() {
return GaugeAssertion {
c: FailedMetricAssertion {
name: self.family.get_name().to_owned(),
msg: "metric not a gauge".to_owned(),
metrics: self.registry.metrics_as_str(),
}
.fail()
.map_err(Into::into),
family_name: "".to_string(),
metric_dump: "".to_string(),
};
}
GaugeAssertion {
c: Ok(metric.get_gauge()),
family_name: self.family.get_name().to_owned(),
metric_dump: self.registry.metrics_as_str(),
}
}
/// Returns the histogram metric, allowing assertions to be applied.
pub fn histogram(&mut self) -> Histogram<'_> {
// sort the assertion's attributes
self.attributes.sort_by(|a, b| a.0.cmp(&b.0));
let metric = self.family.get_metric().iter().find(|metric| {
if metric.get_label().len() != self.attributes.len() {
return false; // this metric can't match
}
// sort this metrics attributes and compare to assertion attributes
let mut metric_attributes = metric.get_label().to_vec();
metric_attributes.sort_by(|a, b| a.get_name().cmp(b.get_name()));
// metric only matches if all attributes are identical.
metric_attributes
.iter()
.zip(self.attributes.iter())
.all(|(a, b)| a.get_name() == b.0 && a.get_value() == b.1)
});
// Can't find metric matching attributes
let metric = match metric {
Some(metric) => metric,
None => {
return Histogram {
c: NoMatchingAttributes {
name: self.family.get_name(),
attributes: self.attributes.clone(), // Maybe `attributes: &self.attributes`
metrics: self.registry.metrics_as_str(),
}
.fail()
.map_err(Into::into),
family_name: "".to_string(),
metric_dump: "".to_string(),
};
}
};
if !metric.has_histogram() {
return Histogram {
c: FailedMetricAssertion {
name: self.family.get_name().to_owned(),
msg: "metric not a counter".to_owned(),
metrics: self.registry.metrics_as_str(),
}
.fail()
.map_err(Into::into),
family_name: "".to_string(),
metric_dump: "".to_string(),
};
}
Histogram {
c: Ok(metric.get_histogram()),
family_name: self.family.get_name().to_owned(),
metric_dump: self.registry.metrics_as_str(),
}
}
}
#[derive(Debug)]
pub struct CounterAssertion<'a> {
// if there was a problem getting the counter based on attributes then the
// Error will contain details.
c: Result<&'a PromCounter, Error>,
family_name: String,
metric_dump: String,
}
impl<'a> CounterAssertion<'a> {
pub fn eq(self, v: f64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
v == c.get_value(),
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} == {:?} failed", c.get_value(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn gte(self, v: f64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
c.get_value() >= v,
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} >= {:?} failed", c.get_value(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn gt(self, v: f64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
c.get_value() > v,
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} > {:?} failed", c.get_value(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn lte(self, v: f64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
c.get_value() <= v,
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} <= {:?} failed", c.get_value(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn lt(self, v: f64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
c.get_value() < v,
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} < {:?} failed", c.get_value(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
}
#[derive(Debug)]
pub struct GaugeAssertion<'a> {
// if there was a problem getting the gauge based on attributes then the
// Error will contain details.
c: Result<&'a PromGauge, Error>,
family_name: String,
metric_dump: String,
}
impl<'a> GaugeAssertion<'a> {
pub fn eq(self, v: f64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
v == c.get_value(),
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} == {:?} failed", c.get_value(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn gte(self, v: f64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
c.get_value() >= v,
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} >= {:?} failed", c.get_value(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn gt(self, v: f64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
c.get_value() > v,
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} > {:?} failed", c.get_value(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn lte(self, v: f64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
c.get_value() <= v,
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} <= {:?} failed", c.get_value(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn lt(self, v: f64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
c.get_value() < v,
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} < {:?} failed", c.get_value(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
}
#[derive(Debug)]
pub struct Histogram<'a> {
// if there was a problem getting the counter based on attributes then the
// Error will contain details.
c: Result<&'a PromHistogram, Error>,
family_name: String,
metric_dump: String,
}
impl<'a> Histogram<'a> {
pub fn bucket_cumulative_count_eq(self, bound: f64, count: u64) -> Result<(), Error> {
let c = self.c?; // return previous errors
let bucket = c
.get_bucket()
.iter()
.find(|bucket| bucket.get_upper_bound() == bound)
.context(HistogramBucketNotFound {
bound,
name: &self.family_name,
metrics: &self.metric_dump,
})?;
ensure!(
count == bucket.get_cumulative_count(),
FailedMetricAssertion {
name: &self.family_name,
msg: format!("{:?} == {:?} failed", bucket.get_cumulative_count(), count),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn sample_sum_eq(self, v: f64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
v == c.get_sample_sum(),
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} == {:?} failed", c.get_sample_sum(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn sample_sum_gte(self, v: f64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
c.get_sample_sum() >= v,
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} >= {:?} failed", c.get_sample_sum(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn sample_sum_gt(self, v: f64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
c.get_sample_sum() > v,
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} > {:?} failed", c.get_sample_sum(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn sample_sum_lte(self, v: f64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
c.get_sample_sum() <= v,
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} <= {:?} failed", c.get_sample_sum(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn sample_sum_lt(self, v: f64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
c.get_sample_sum() < v,
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} < {:?} failed", c.get_sample_sum(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn sample_count_eq(self, v: u64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
c.get_sample_count() == v,
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} == {:?} failed", c.get_sample_count(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn sample_count_gte(self, v: u64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
c.get_sample_count() >= v,
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} >= {:?} failed", c.get_sample_count(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn sample_count_gt(self, v: u64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
c.get_sample_count() > v,
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} > {:?} failed", c.get_sample_count(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn sample_count_lte(self, v: u64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
c.get_sample_count() <= v,
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} <= {:?} failed", c.get_sample_count(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
pub fn sample_count_lt(self, v: u64) -> Result<(), Error> {
let c = self.c?; // return previous errors
ensure!(
c.get_sample_count() < v,
FailedMetricAssertion {
name: self.family_name,
msg: format!("{:?} < {:?} failed", c.get_sample_count(), v),
metrics: self.metric_dump,
}
);
Ok(())
}
}

View File

@ -56,7 +56,7 @@ pub struct ChunkMetrics {
impl ChunkMetrics {
/// Creates an instance of ChunkMetrics that isn't registered with a central
/// metrics registry. Observations made to instruments on this ChunkMetrics instance
/// metric registry. Observations made to instruments on this ChunkMetrics instance
/// will therefore not be visible to other ChunkMetrics instances or metric instruments
/// created on a metrics domain, and vice versa
pub fn new_unregistered() -> Self {

View File

@ -64,7 +64,7 @@ pub struct ChunkMetrics {
impl ChunkMetrics {
/// Creates an instance of ChunkMetrics that isn't registered with a central
/// metrics registry. Observations made to instruments on this ChunkMetrics instance
/// metric registry. Observations made to instruments on this ChunkMetrics instance
/// will therefore not be visible to other ChunkMetrics instances or metric instruments
/// created on a metrics domain, and vice versa
pub fn new_unregistered() -> Self {

View File

@ -57,7 +57,7 @@ async fn chunk_pruning_sql() {
// Test that partition pruning is connected up
let TestDb {
db,
metrics_registry_v2,
metric_registry,
..
} = setup().await;
@ -79,7 +79,7 @@ async fn chunk_pruning_sql() {
let attributes = Attributes::from(&[("db_name", "placeholder"), ("table_name", "cpu")]);
// Validate that the chunk was pruned using the metrics
let pruned_chunks = metrics_registry_v2
let pruned_chunks = metric_registry
.get_instrument::<Metric<U64Counter>>("query_access_pruned_chunks")
.unwrap()
.get_observer(&attributes)
@ -88,7 +88,7 @@ async fn chunk_pruning_sql() {
assert_eq!(pruned_chunks, 1);
// Validate that the chunk was pruned using the metrics
let pruned_rows = metrics_registry_v2
let pruned_rows = metric_registry
.get_instrument::<Metric<U64Counter>>("query_access_pruned_rows")
.unwrap()
.get_observer(&attributes)
@ -103,7 +103,7 @@ async fn chunk_pruning_influxrpc() {
// Test that partition pruning is connected up
let TestDb {
db,
metrics_registry_v2,
metric_registry,
..
} = setup().await;
@ -127,7 +127,7 @@ async fn chunk_pruning_influxrpc() {
let attributes = Attributes::from(&[("db_name", "placeholder"), ("table_name", "cpu")]);
// Validate that the chunk was pruned using the metrics
let pruned_chunks = metrics_registry_v2
let pruned_chunks = metric_registry
.get_instrument::<Metric<U64Counter>>("query_access_pruned_chunks")
.unwrap()
.get_observer(&attributes)
@ -136,7 +136,7 @@ async fn chunk_pruning_influxrpc() {
assert_eq!(pruned_chunks, 1);
// Validate that the chunk was pruned using the metrics
let pruned_rows = metrics_registry_v2
let pruned_rows = metric_registry
.get_instrument::<Metric<U64Counter>>("query_access_pruned_rows")
.unwrap()
.get_observer(&attributes)

View File

@ -397,9 +397,9 @@ impl ChunkMetrics {
}
/// Creates an instance of ChunkMetrics that isn't registered with a central
/// metrics registry. Observations made to instruments on this ChunkMetrics instance
/// metric registry. Observations made to instruments on this ChunkMetrics instance
/// will therefore not be visible to other ChunkMetrics instances or metric instruments
/// created on a metrics registry
/// created on a metric registry
pub fn new_unregistered() -> Self {
Self {
base_attributes: Attributes::from([]),

View File

@ -30,7 +30,6 @@ iox_object_store = { path = "../iox_object_store" }
itertools = "0.10.1"
lifecycle = { path = "../lifecycle" }
metric = { path = "../metric" }
metrics = { path = "../metrics" }
mutable_buffer = { path = "../mutable_buffer" }
num_cpus = "1.13.0"
object_store = { path = "../object_store" }

View File

@ -1,6 +1,5 @@
use std::sync::Arc;
use metrics::MetricRegistry;
use object_store::ObjectStore;
use observability_deps::tracing::info;
use query::exec::Executor;
@ -16,8 +15,7 @@ pub struct ApplicationState {
write_buffer_factory: Arc<WriteBufferConfigFactory>,
executor: Arc<Executor>,
job_registry: Arc<JobRegistry>,
metric_registry: Arc<MetricRegistry>,
metric_registry_v2: Arc<metric::Registry>,
metric_registry: Arc<metric::Registry>,
}
impl ApplicationState {
@ -43,16 +41,15 @@ impl ApplicationState {
let num_threads = num_worker_threads.unwrap_or_else(num_cpus::get);
info!(%num_threads, "using specified number of threads per thread pool");
let metric_registry_v2 = Arc::new(metric::Registry::new());
let job_registry = Arc::new(JobRegistry::new(Arc::clone(&metric_registry_v2)));
let metric_registry = Arc::new(metric::Registry::new());
let job_registry = Arc::new(JobRegistry::new(Arc::clone(&metric_registry)));
Self {
object_store,
write_buffer_factory,
executor: Arc::new(Executor::new(num_threads)),
job_registry,
metric_registry: Arc::new(metrics::MetricRegistry::new()),
metric_registry_v2,
metric_registry,
}
}
@ -68,14 +65,10 @@ impl ApplicationState {
&self.job_registry
}
pub fn metric_registry(&self) -> &Arc<MetricRegistry> {
pub fn metric_registry(&self) -> &Arc<metric::Registry> {
&self.metric_registry
}
pub fn metric_registry_v2(&self) -> &Arc<metric::Registry> {
&self.metric_registry_v2
}
pub fn executor(&self) -> &Arc<Executor> {
&self.executor
}

View File

@ -7,6 +7,7 @@ use crate::{
rules::ProvidedDatabaseRules,
ApplicationState, Db,
};
use chrono::{DateTime, Utc};
use data_types::{database_rules::WriteBufferDirection, server_id::ServerId, DatabaseName};
use futures::{
future::{BoxFuture, FusedFuture, Shared},
@ -26,6 +27,8 @@ use tokio_util::sync::CancellationToken;
const INIT_BACKOFF: Duration = Duration::from_secs(1);
mod metrics;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
@ -85,6 +88,26 @@ pub enum Error {
NoActiveDatabaseToDelete { db_name: String },
}
#[derive(Debug, Snafu)]
pub enum WriteError {
#[snafu(context(false))]
DbError { source: super::db::Error },
#[snafu(display("write buffer producer error: {}", source))]
WriteBuffer {
source: Box<dyn std::error::Error + Sync + Send>,
},
#[snafu(display("writing only allowed through write buffer"))]
WritingOnlyAllowedThroughWriteBuffer,
#[snafu(display("database not initialized: {}", state))]
NotInitialized { state: DatabaseStateCode },
#[snafu(display("Hard buffer size limit reached"))]
HardLimitReached {},
}
/// A `Database` represents a single configured IOx database - i.e. an
/// entity with a corresponding set of `DatabaseRules`.
///
@ -123,12 +146,16 @@ impl Database {
"new database"
);
let metrics =
metrics::Metrics::new(application.metric_registry().as_ref(), config.name.as_str());
let shared = Arc::new(DatabaseShared {
config,
application,
shutdown: Default::default(),
state: RwLock::new(Freezable::new(DatabaseState::Known(DatabaseStateKnown {}))),
state_notify: Default::default(),
metrics,
});
let handle = tokio::spawn(background_worker(Arc::clone(&shared)));
@ -162,11 +189,8 @@ impl Database {
.context(SavingRules)?;
create_preserved_catalog(
db_name.as_str(),
Arc::clone(&iox_object_store),
server_id,
Arc::clone(application.metric_registry()),
Arc::clone(application.metric_registry_v2()),
true,
)
.await
@ -462,6 +486,48 @@ impl Database {
Ok(())
})
}
/// Writes an entry to this `Database` this will either:
///
/// - write it to a write buffer
/// - write it to a local `Db`
///
pub async fn write_entry(
&self,
entry: entry::Entry,
time_of_write: DateTime<Utc>,
) -> Result<(), WriteError> {
let recorder = self.shared.metrics.entry_ingest(entry.data().len());
let db = {
let state = self.shared.state.read();
match &**state {
DatabaseState::Initialized(initialized) => match &initialized.write_buffer_consumer
{
Some(_) => return Err(WriteError::WritingOnlyAllowedThroughWriteBuffer),
None => Arc::clone(&initialized.db),
},
state => {
return Err(WriteError::NotInitialized {
state: state.state_code(),
})
}
}
};
db.store_entry(entry, time_of_write).await.map_err(|e| {
use super::db::Error;
match e {
// TODO: Pull write buffer producer out of Db
Error::WriteBufferWritingError { source } => WriteError::WriteBuffer { source },
Error::HardLimitReached {} => WriteError::HardLimitReached {},
e => e.into(),
}
})?;
recorder.success();
Ok(())
}
}
impl Drop for Database {
@ -497,6 +563,9 @@ struct DatabaseShared {
/// Notify that the database state has changed
state_notify: Notify,
/// Metrics for this database
metrics: metrics::Metrics,
}
/// The background worker for `Database` - there should only ever be one
@ -953,9 +1022,7 @@ impl DatabaseStateRulesLoaded {
let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog(
shared.config.name.as_str(),
Arc::clone(&self.iox_object_store),
shared.config.server_id,
Arc::clone(shared.application.metric_registry()),
Arc::clone(shared.application.metric_registry_v2()),
shared.config.wipe_catalog_on_error,
shared.config.skip_replay,
)
@ -983,7 +1050,7 @@ impl DatabaseStateRulesLoaded {
preserved_catalog,
catalog,
write_buffer_producer: producer,
metrics_registry_v2: Arc::clone(shared.application.metric_registry_v2()),
metric_registry: Arc::clone(shared.application.metric_registry()),
};
let db = Db::new(
@ -1037,7 +1104,7 @@ impl DatabaseStateCatalogLoaded {
Some(Arc::new(WriteBufferConsumer::new(
consumer,
Arc::clone(&db),
shared.application.metric_registry_v2().as_ref(),
shared.application.metric_registry().as_ref(),
)))
}
_ => None,

View File

@ -0,0 +1,58 @@
use metric::U64Counter;
#[derive(Debug)]
pub struct Metrics {
ingest_entries_bytes_ok: U64Counter,
ingest_entries_bytes_error: U64Counter,
}
impl Metrics {
pub fn new(registry: &metric::Registry, db_name: impl Into<String>) -> Self {
let db_name = db_name.into();
let metric = registry
.register_metric::<U64Counter>("ingest_entries_bytes", "total ingested entry bytes");
Self {
ingest_entries_bytes_ok: metric
.recorder([("db_name", db_name.clone().into()), ("status", "ok".into())]),
ingest_entries_bytes_error: metric
.recorder([("db_name", db_name.into()), ("status", "error".into())]),
}
}
/// Get a recorder for reporting entry ingest
pub fn entry_ingest(&self, bytes: usize) -> EntryIngestRecorder<'_> {
EntryIngestRecorder {
metrics: self,
recorded: false,
bytes,
}
}
}
/// An RAII handle that records metrics for ingest
///
/// Records an error on drop unless `EntryIngestRecorder::success` invoked
pub struct EntryIngestRecorder<'a> {
metrics: &'a Metrics,
bytes: usize,
recorded: bool,
}
impl<'a> EntryIngestRecorder<'a> {
pub fn success(mut self) {
self.recorded = true;
self.metrics.ingest_entries_bytes_ok.inc(self.bytes as u64)
}
}
impl<'a> Drop for EntryIngestRecorder<'a> {
fn drop(&mut self) {
if !self.recorded {
self.metrics
.ingest_entries_bytes_error
.inc(self.bytes as u64);
}
}
}

View File

@ -29,7 +29,6 @@ use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
use entry::{Entry, Sequence, SequencedEntry, TableBatch};
use internal_types::schema::Schema;
use iox_object_store::IoxObjectStore;
use metrics::KeyValue;
use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
use observability_deps::tracing::{debug, error, info};
use parquet_file::catalog::{
@ -257,11 +256,8 @@ pub struct Db {
/// A handle to the global jobs registry for long running tasks
jobs: Arc<JobRegistry>,
/// The metrics registry to inject into created components in the Db.
metrics_registry: Arc<metrics::MetricRegistry>,
/// The global metrics registry
metrics_registry_v2: Arc<metric::Registry>,
/// The global metric registry
metric_registry: Arc<metric::Registry>,
/// Catalog interface for query
catalog_access: Arc<QueryCatalogAccess>,
@ -272,9 +268,6 @@ pub struct Db {
/// Number of iterations of the worker cleanup loop for this Db
worker_iterations_cleanup: AtomicUsize,
/// Metric attributes
metric_attributes: Vec<KeyValue>,
/// Optional write buffer producer
/// TODO: Move onto Database
write_buffer_producer: Option<Arc<dyn WriteBufferWriting>>,
@ -312,7 +305,7 @@ pub(crate) struct DatabaseToCommit {
/// TODO: Move onto Database
pub(crate) write_buffer_producer: Option<Arc<dyn WriteBufferWriting>>,
pub(crate) metrics_registry_v2: Arc<metric::Registry>,
pub(crate) metric_registry: Arc<metric::Registry>,
}
impl Db {
@ -322,8 +315,6 @@ impl Db {
let rules = RwLock::new(database_to_commit.rules);
let server_id = database_to_commit.server_id;
let iox_object_store = Arc::clone(&database_to_commit.iox_object_store);
let metrics_registry = Arc::clone(&database_to_commit.catalog.metrics_registry);
let metric_attributes = database_to_commit.catalog.metric_attributes.clone();
let catalog = Arc::new(database_to_commit.catalog);
@ -331,7 +322,7 @@ impl Db {
&db_name,
Arc::clone(&catalog),
Arc::clone(&jobs),
database_to_commit.metrics_registry_v2.as_ref(),
database_to_commit.metric_registry.as_ref(),
);
let catalog_access = Arc::new(catalog_access);
@ -343,12 +334,10 @@ impl Db {
preserved_catalog: Arc::new(database_to_commit.preserved_catalog),
catalog,
jobs,
metrics_registry,
metrics_registry_v2: database_to_commit.metrics_registry_v2,
metric_registry: database_to_commit.metric_registry,
catalog_access,
worker_iterations_lifecycle: AtomicUsize::new(0),
worker_iterations_cleanup: AtomicUsize::new(0),
metric_attributes,
write_buffer_producer: database_to_commit.write_buffer_producer,
cleanup_lock: Default::default(),
lifecycle_policy: tokio::sync::Mutex::new(None),
@ -1062,7 +1051,7 @@ impl Db {
}
None => {
let chunk_result = MBChunk::new(
MutableBufferChunkMetrics::new(self.metrics_registry_v2.as_ref()),
MutableBufferChunkMetrics::new(self.metric_registry.as_ref()),
table_batch,
mask.as_ref().map(|x| x.as_ref()),
)
@ -1589,7 +1578,7 @@ mod tests {
write_lp(db.as_ref(), "cpu bar=1 10").await;
let registry = test_db.metrics_registry_v2.as_ref();
let registry = test_db.metric_registry.as_ref();
// A chunk has been opened
assert_storage_gauge(registry, "catalog_loaded_chunks", "mutable_buffer", 1);
@ -1703,7 +1692,7 @@ mod tests {
summary.record(Utc.timestamp_nanos(650000000010));
let mut reporter = metric::RawReporter::default();
test_db.metrics_registry_v2.report(&mut reporter);
test_db.metric_registry.report(&mut reporter);
let observation = reporter
.metric("catalog_row_time")
@ -1849,7 +1838,7 @@ mod tests {
let batches = run_query(Arc::clone(&db), "select * from cpu").await;
assert_batches_eq!(&expected, &batches);
let registry = test_db.metrics_registry_v2.as_ref();
let registry = test_db.metric_registry.as_ref();
// A chunk is now in the read buffer
assert_storage_gauge(registry, "catalog_loaded_chunks", "read_buffer", 1);
@ -1976,7 +1965,7 @@ mod tests {
let mb = collect_read_filter(&mb_chunk).await;
let registry = test_db.metrics_registry_v2.as_ref();
let registry = test_db.metric_registry.as_ref();
// MUB chunk size
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 3607);
@ -2080,7 +2069,7 @@ mod tests {
.await
.unwrap();
let registry = test_db.metrics_registry_v2.as_ref();
let registry = test_db.metric_registry.as_ref();
// Read buffer + Parquet chunk size
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0);
@ -2189,7 +2178,7 @@ mod tests {
vec![pq_chunk_id]
);
let registry = test_db.metrics_registry_v2.as_ref();
let registry = test_db.metric_registry.as_ref();
// Read buffer + Parquet chunk size
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0);
@ -3073,7 +3062,7 @@ mod tests {
write_lp(db.as_ref(), "cpu bar=1 10").await;
let mut reporter = metric::RawReporter::default();
test_db.metrics_registry_v2.report(&mut reporter);
test_db.metric_registry.report(&mut reporter);
let exclusive = reporter
.metric("catalog_lock")
@ -3124,7 +3113,7 @@ mod tests {
task.await.unwrap();
let mut reporter = metric::RawReporter::default();
test_db.metrics_registry_v2.report(&mut reporter);
test_db.metric_registry.report(&mut reporter);
let exclusive = reporter
.metric("catalog_lock")

View File

@ -91,35 +91,21 @@ pub struct Catalog {
tables: RwLock<HashMap<Arc<str>, Table>>,
metrics: Arc<CatalogMetrics>,
pub(crate) metrics_registry: Arc<::metrics::MetricRegistry>,
pub(crate) metric_attributes: Vec<::metrics::KeyValue>,
}
impl Catalog {
#[cfg(test)]
fn test() -> Self {
let registry = Arc::new(::metrics::MetricRegistry::new());
Self::new(Arc::from("test"), registry, Default::default(), vec![])
Self::new(Arc::from("test"), Default::default())
}
pub fn new(
db_name: Arc<str>,
metrics_registry: Arc<::metrics::MetricRegistry>,
metrics_registry_v2: Arc<::metric::Registry>,
metric_attributes: Vec<::metrics::KeyValue>,
) -> Self {
let metrics = Arc::new(CatalogMetrics::new(
Arc::clone(&db_name),
metrics_registry_v2,
));
pub fn new(db_name: Arc<str>, metric_registry: Arc<::metric::Registry>) -> Self {
let metrics = Arc::new(CatalogMetrics::new(Arc::clone(&db_name), metric_registry));
Self {
db_name,
tables: Default::default(),
metrics,
metrics_registry,
metric_attributes,
}
}

View File

@ -253,7 +253,7 @@ pub struct ChunkMetrics {
impl ChunkMetrics {
/// Creates an instance of ChunkMetrics that isn't registered with a central
/// metrics registry. Observations made to instruments on this ChunkMetrics instance
/// metric registry. Observations made to instruments on this ChunkMetrics instance
/// will therefore not be visible to other ChunkMetrics instances or metric instruments
/// created on a metrics domain, and vice versa
pub fn new_unregistered() -> Self {

View File

@ -22,15 +22,15 @@ pub struct CatalogMetrics {
/// Name of the database
db_name: Arc<str>,
/// Metrics registry
metrics_registry: Arc<metric::Registry>,
/// Metric registry
metric_registry: Arc<metric::Registry>,
/// Catalog memory metrics
memory_metrics: StorageGauge,
}
impl CatalogMetrics {
pub fn new(db_name: Arc<str>, metrics_registry: Arc<metric::Registry>) -> Self {
let chunks_mem_usage = metrics_registry.register_metric(
pub fn new(db_name: Arc<str>, metric_registry: Arc<metric::Registry>) -> Self {
let chunks_mem_usage = metric_registry.register_metric(
"catalog_chunks_mem_usage_bytes",
"Memory usage by catalog chunks",
);
@ -40,7 +40,7 @@ impl CatalogMetrics {
Self {
db_name,
metrics_registry,
metric_registry,
memory_metrics,
}
}
@ -59,34 +59,34 @@ impl CatalogMetrics {
let mut lock_attributes = base_attributes.clone();
lock_attributes.insert("lock", "table");
let table_lock_metrics = Arc::new(LockMetrics::new(
self.metrics_registry.as_ref(),
self.metric_registry.as_ref(),
lock_attributes.clone(),
));
lock_attributes.insert("lock", "partition");
let partition_lock_metrics = Arc::new(LockMetrics::new(
self.metrics_registry.as_ref(),
self.metric_registry.as_ref(),
lock_attributes.clone(),
));
lock_attributes.insert("lock", "chunk");
let chunk_lock_metrics = Arc::new(LockMetrics::new(
self.metrics_registry.as_ref(),
self.metric_registry.as_ref(),
lock_attributes,
));
let storage_gauge = self.metrics_registry.register_metric(
let storage_gauge = self.metric_registry.register_metric(
"catalog_loaded_chunks",
"The number of chunks loaded in a each chunk storage location",
);
let row_gauge = self.metrics_registry.register_metric(
let row_gauge = self.metric_registry.register_metric(
"catalog_loaded_rows",
"The number of rows loaded in each chunk storage location",
);
let timestamp_histogram = report_timestamp_metrics(table_name).then(|| {
TimestampHistogram::new(self.metrics_registry.as_ref(), base_attributes.clone())
TimestampHistogram::new(self.metric_registry.as_ref(), base_attributes.clone())
});
let chunk_storage = StorageGauge::new(&storage_gauge, base_attributes.clone());

View File

@ -82,7 +82,7 @@ pub(crate) fn compact_chunks(
let time_of_first_write = time_of_first_write.expect("Should have had a first write somewhere");
let time_of_last_write = time_of_last_write.expect("Should have had a last write somewhere");
let metric_registry = Arc::clone(&db.metrics_registry_v2);
let metric_registry = Arc::clone(&db.metric_registry);
let ctx = db.exec.new_context(ExecutorType::Reorg);
let fut = async move {

View File

@ -41,7 +41,7 @@ pub fn move_chunk_to_read_buffer(
// Drop locks
let chunk = guard.into_data().chunk;
let metric_registry = Arc::clone(&db.metrics_registry_v2);
let metric_registry = Arc::clone(&db.metric_registry);
let ctx = db.exec.new_context(ExecutorType::Reorg);
let fut = async move {

View File

@ -87,7 +87,7 @@ pub fn persist_chunks(
let time_of_first_write = time_of_first_write.expect("Should have had a first write somewhere");
let time_of_last_write = time_of_last_write.expect("Should have had a last write somewhere");
let metric_registry = Arc::clone(&db.metrics_registry_v2);
let metric_registry = Arc::clone(&db.metric_registry);
let ctx = db.exec.new_context(ExecutorType::Reorg);
let fut = async move {

View File

@ -131,7 +131,7 @@ pub(super) fn write_chunk_to_object_store(
.context(WritingToObjectStore)?;
let parquet_metadata = Arc::new(parquet_metadata);
let metrics = ParquetChunkMetrics::new(db.metrics_registry_v2.as_ref());
let metrics = ParquetChunkMetrics::new(db.metric_registry.as_ref());
let parquet_chunk = Arc::new(
ParquetChunk::new(
&path,

View File

@ -2,9 +2,7 @@
//! [`PreservedCatalog`](parquet_file::catalog::api::PreservedCatalog).
use super::catalog::{chunk::ChunkStage, table::TableSchemaUpsertHandle, Catalog};
use data_types::server_id::ServerId;
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use metrics::{KeyValue, MetricRegistry};
use observability_deps::tracing::{error, info};
use parquet_file::{
catalog::api::{CatalogParquetInfo, CatalogState, ChunkCreationFailed, PreservedCatalog},
@ -48,22 +46,14 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
pub async fn load_or_create_preserved_catalog(
db_name: &str,
iox_object_store: Arc<IoxObjectStore>,
server_id: ServerId,
metrics_registry: Arc<MetricRegistry>,
metrics_registry_v2: Arc<::metric::Registry>,
metric_registry: Arc<::metric::Registry>,
wipe_on_error: bool,
skip_replay: bool,
) -> Result<(PreservedCatalog, Catalog, Option<ReplayPlan>)> {
// first try to load existing catalogs
match PreservedCatalog::load(
Arc::clone(&iox_object_store),
LoaderEmptyInput::new(
db_name,
server_id,
Arc::clone(&metrics_registry),
Arc::clone(&metrics_registry_v2),
skip_replay,
),
LoaderEmptyInput::new(Arc::clone(&metric_registry), skip_replay),
)
.await
{
@ -86,15 +76,8 @@ pub async fn load_or_create_preserved_catalog(
db_name
);
create_preserved_catalog(
db_name,
Arc::clone(&iox_object_store),
server_id,
Arc::clone(&metrics_registry),
metrics_registry_v2,
skip_replay,
)
.await
create_preserved_catalog(Arc::clone(&iox_object_store), metric_registry, skip_replay)
.await
}
Err(e) => {
if wipe_on_error {
@ -107,11 +90,8 @@ pub async fn load_or_create_preserved_catalog(
.context(CannotWipeCatalog)?;
create_preserved_catalog(
db_name,
Arc::clone(&iox_object_store),
server_id,
Arc::clone(&metrics_registry),
metrics_registry_v2,
metric_registry,
skip_replay,
)
.await
@ -126,22 +106,13 @@ pub async fn load_or_create_preserved_catalog(
///
/// This will fail if a preserved catalog already exists.
pub async fn create_preserved_catalog(
db_name: &str,
iox_object_store: Arc<IoxObjectStore>,
server_id: ServerId,
metrics_registry: Arc<MetricRegistry>,
metrics_registry_v2: Arc<metric::Registry>,
metric_registry: Arc<metric::Registry>,
skip_replay: bool,
) -> Result<(PreservedCatalog, Catalog, Option<ReplayPlan>)> {
let (preserved_catalog, loader) = PreservedCatalog::new_empty(
Arc::clone(&iox_object_store),
LoaderEmptyInput::new(
db_name,
server_id,
metrics_registry,
metrics_registry_v2,
skip_replay,
),
LoaderEmptyInput::new(metric_registry, skip_replay),
)
.await
.context(CannotCreateCatalog)?;
@ -159,29 +130,14 @@ pub async fn create_preserved_catalog(
/// All input required to create an empty [`Loader`]
#[derive(Debug)]
struct LoaderEmptyInput {
metrics_registry: Arc<::metrics::MetricRegistry>,
metrics_registry_v2: Arc<::metric::Registry>,
metric_attributes: Vec<KeyValue>,
metric_registry: Arc<::metric::Registry>,
skip_replay: bool,
}
impl LoaderEmptyInput {
fn new(
db_name: &str,
server_id: ServerId,
metrics_registry: Arc<MetricRegistry>,
metrics_registry_v2: Arc<metric::Registry>,
skip_replay: bool,
) -> Self {
let metric_attributes = vec![
KeyValue::new("db_name", db_name.to_string()),
KeyValue::new("svr_id", format!("{}", server_id)),
];
fn new(metric_registry: Arc<metric::Registry>, skip_replay: bool) -> Self {
Self {
metrics_registry,
metrics_registry_v2,
metric_attributes,
metric_registry,
skip_replay,
}
}
@ -192,7 +148,7 @@ impl LoaderEmptyInput {
struct Loader {
catalog: Catalog,
planner: Option<ReplayPlanner>,
metrics_registry_v2: Arc<metric::Registry>,
metric_registry: Arc<metric::Registry>,
}
impl CatalogState for Loader {
@ -200,14 +156,9 @@ impl CatalogState for Loader {
fn new_empty(db_name: &str, data: Self::EmptyInput) -> Self {
Self {
catalog: Catalog::new(
Arc::from(db_name),
data.metrics_registry,
Arc::clone(&data.metrics_registry_v2),
data.metric_attributes,
),
catalog: Catalog::new(Arc::from(db_name), Arc::clone(&data.metric_registry)),
planner: (!data.skip_replay).then(ReplayPlanner::new),
metrics_registry_v2: Arc::new(Default::default()),
metric_registry: Arc::new(Default::default()),
}
}
@ -241,7 +192,7 @@ impl CatalogState for Loader {
}
// Create a parquet chunk for this chunk
let metrics = ParquetChunkMetrics::new(self.metrics_registry_v2.as_ref());
let metrics = ParquetChunkMetrics::new(self.metric_registry.as_ref());
let parquet_chunk = ParquetChunk::new(
&info.path,
iox_object_store,
@ -319,7 +270,7 @@ impl CatalogState for Loader {
mod tests {
use super::*;
use crate::db::checkpoint_data_from_catalog;
use data_types::DatabaseName;
use data_types::{server_id::ServerId, DatabaseName};
use object_store::ObjectStore;
use parquet_file::catalog::{
api::CheckpointData,
@ -348,8 +299,6 @@ mod tests {
load_or_create_preserved_catalog(
&db_name,
iox_object_store,
server_id,
Default::default(),
Default::default(),
true,
false,
@ -364,11 +313,8 @@ mod tests {
#[tokio::test]
async fn test_catalog_state() {
let metrics_registry = Arc::new(::metrics::MetricRegistry::new());
let empty_input = LoaderEmptyInput {
metrics_registry,
metrics_registry_v2: Default::default(),
metric_attributes: vec![],
metric_registry: Default::default(),
skip_replay: false,
};
assert_catalog_state_implementation::<Loader, _>(empty_input, checkpoint_data_from_loader)

View File

@ -24,11 +24,11 @@ pub struct JobRegistryInner {
}
impl JobRegistry {
pub fn new(metric_registry_v2: Arc<metric::Registry>) -> Self {
pub fn new(metric_registry: Arc<metric::Registry>) -> Self {
Self {
inner: Mutex::new(JobRegistryInner {
registry: TaskRegistryWithHistory::new(JOB_HISTORY_SIZE),
metrics: JobRegistryMetrics::new(metric_registry_v2),
metrics: JobRegistryMetrics::new(metric_registry),
}),
}
}
@ -102,17 +102,17 @@ struct JobRegistryMetrics {
}
impl JobRegistryMetrics {
fn new(metric_registry_v2: Arc<metric::Registry>) -> Self {
fn new(metric_registry: Arc<metric::Registry>) -> Self {
Self {
active_gauge: metric_registry_v2
active_gauge: metric_registry
.register_metric("influxdb_iox_job_count", "Number of known jobs"),
completed_accu: Default::default(),
cpu_time_histogram: metric_registry_v2.register_metric_with_options(
cpu_time_histogram: metric_registry.register_metric_with_options(
"influxdb_iox_job_completed_cpu",
"CPU time of of completed jobs",
Self::duration_histogram_options,
),
wall_time_histogram: metric_registry_v2.register_metric_with_options(
wall_time_histogram: metric_registry.register_metric_with_options(
"influxdb_iox_job_completed_wall",
"Wall time of of completed jobs",
Self::duration_histogram_options,

View File

@ -71,7 +71,7 @@
use async_trait::async_trait;
use chrono::Utc;
use data_types::{
database_rules::{NodeGroup, RoutingRules, ShardConfig, ShardId, Sink},
database_rules::{NodeGroup, RoutingRules, ShardId, Sink},
deleted_database::DeletedDatabase,
error::ErrorLogger,
job::Job,
@ -79,7 +79,7 @@ use data_types::{
{DatabaseName, DatabaseNameError},
};
use database::{Database, DatabaseConfig};
use entry::{lines_to_sharded_entries, pb_to_entry, Entry, ShardedEntry};
use entry::{lines_to_sharded_entries, pb_to_entry, Entry};
use futures::future::{BoxFuture, Future, FutureExt, Shared, TryFutureExt};
use generated_types::influxdata::pbdata::v1 as pb;
use hashbrown::HashMap;
@ -203,8 +203,10 @@ pub enum Error {
))]
WritingOnlyAllowedThroughWriteBuffer { db_name: String },
#[snafu(display("Cannot write to write buffer, bytes {}: {}", bytes, source))]
WriteBuffer { source: db::Error, bytes: u64 },
#[snafu(display("Cannot write to write buffer: {}", source))]
WriteBuffer {
source: Box<dyn std::error::Error + Sync + Send>,
},
#[snafu(display("no remote configured for node group: {:?}", node_group))]
NoRemoteConfigured { node_group: NodeGroup },
@ -258,52 +260,6 @@ pub trait DatabaseStore: std::fmt::Debug + Send + Sync {
async fn db_or_create(&self, name: &str) -> Result<Arc<Self::Database>, Self::Error>;
}
/// A collection of metrics used to instrument the Server.
#[derive(Debug)]
pub struct ServerMetrics {
/// The number of LP lines ingested
pub ingest_lines_total: metrics::Counter,
/// The number of LP fields ingested
pub ingest_fields_total: metrics::Counter,
/// The number of LP bytes ingested
pub ingest_points_bytes_total: metrics::Counter,
/// The number of Entry bytes ingested
pub ingest_entries_bytes_total: metrics::Counter,
}
impl ServerMetrics {
pub fn new(registry: Arc<metrics::MetricRegistry>) -> Self {
// Server manages multiple domains.
let ingest_domain = registry.register_domain("ingest");
Self {
ingest_lines_total: ingest_domain.register_counter_metric(
"points",
None,
"total LP points ingested",
),
ingest_fields_total: ingest_domain.register_counter_metric(
"fields",
None,
"total LP field values ingested",
),
ingest_points_bytes_total: ingest_domain.register_counter_metric(
"points",
Some("bytes"),
"total LP points bytes ingested",
),
ingest_entries_bytes_total: ingest_domain.register_counter_metric(
"entries",
Some("bytes"),
"total Entry bytes ingested",
),
}
}
}
/// Configuration options for `Server`
#[derive(Debug)]
pub struct ServerConfig {
@ -331,8 +287,6 @@ impl Default for ServerConfig {
pub struct Server<M: ConnectionManager> {
connection_manager: Arc<M>,
metrics: Arc<ServerMetrics>,
/// Future that resolves when the background worker exits
join: Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>,
@ -515,10 +469,6 @@ where
application: Arc<ApplicationState>,
config: ServerConfig,
) -> Self {
let metrics = Arc::new(ServerMetrics::new(Arc::clone(
application.metric_registry(),
)));
let shared = Arc::new(ServerShared {
shutdown: Default::default(),
application,
@ -533,7 +483,6 @@ where
let join = handle.map_err(Arc::new).boxed().shared();
Self {
metrics,
shared,
join,
connection_manager: Arc::new(connection_manager),
@ -562,13 +511,6 @@ where
Ok(())
}
/// Return the metrics associated with this server
///
/// TODO: Server should record its own metrics
pub fn metrics(&self) -> &Arc<ServerMetrics> {
&self.metrics
}
/// Returns the server id for this server if set
pub fn server_id(&self) -> Option<ServerId> {
self.shared.state.read().server_id()
@ -786,85 +728,40 @@ where
default_time: i64,
) -> Result<()> {
let db = self.db(db_name)?;
let rules = db.rules();
// need to split this in two blocks because we cannot hold a lock across an async call.
let routing_config_target = {
let rules = db.rules();
if let Some(RoutingRules::RoutingConfig(routing_config)) = &rules.routing_rules {
let sharded_entries = lines_to_sharded_entries(
lines,
default_time,
None as Option<&ShardConfig>,
&*rules,
)
let shard_config = match &rules.routing_rules {
Some(RoutingRules::ShardConfig(shard_config)) => Some(shard_config),
_ => None,
};
let sharded_entries =
lines_to_sharded_entries(lines, default_time, shard_config, rules.as_ref())
.context(LineConversion)?;
Some((routing_config.sink.clone(), sharded_entries))
} else {
None
}
};
if let Some((sink, sharded_entries)) = routing_config_target {
for i in sharded_entries {
self.write_entry_sink(db_name, &sink, i.entry).await?;
}
return Ok(());
}
// Split lines into shards while holding a read lock on the sharding config.
// Once the lock is released we have a vector of entries, each associated with a
// shard id, and an Arc to the mapping between shard ids and node
// groups. This map is atomically replaced every time the sharding
// config is updated, hence it's safe to use after we release the shard config
// lock.
let (sharded_entries, shards) = {
let rules = db.rules();
let shard_config = rules.routing_rules.as_ref().map(|cfg| match cfg {
RoutingRules::RoutingConfig(_) => unreachable!("routing config handled above"),
RoutingRules::ShardConfig(shard_config) => shard_config,
});
let sharded_entries =
lines_to_sharded_entries(lines, default_time, shard_config, &*rules)
.context(LineConversion)?;
let shards = shard_config
.as_ref()
.map(|cfg| Arc::clone(&cfg.shards))
.unwrap_or_default();
(sharded_entries, shards)
};
// Write to all shards in parallel; as soon as one fails return error
// immediately to the client and abort all other outstanding requests.
// This can take some time, but we're no longer holding the lock to the shard
// config.
futures_util::future::try_join_all(
sharded_entries
.into_iter()
.map(|e| self.write_sharded_entry(db_name, Arc::clone(&shards), e)),
)
futures_util::future::try_join_all(sharded_entries.into_iter().map(
|sharded_entry| async {
let sink = match &rules.routing_rules {
Some(RoutingRules::ShardConfig(shard_config)) => {
let id = sharded_entry.shard_id.expect("sharded entry");
Some(shard_config.shards.get(&id).expect("valid shard"))
}
Some(RoutingRules::RoutingConfig(config)) => Some(&config.sink),
None => None,
};
match sink {
Some(sink) => {
self.write_entry_sink(db_name, sink, sharded_entry.entry)
.await
}
None => self.write_entry_local(db_name, sharded_entry.entry).await,
}
},
))
.await?;
Ok(())
}
async fn write_sharded_entry(
&self,
db_name: &DatabaseName<'_>,
shards: Arc<std::collections::HashMap<u32, Sink>>,
sharded_entry: ShardedEntry,
) -> Result<()> {
match sharded_entry.shard_id {
Some(shard_id) => {
let sink = shards.get(&shard_id).context(ShardNotFound { shard_id })?;
self.write_entry_sink(db_name, sink, sharded_entry.entry)
.await?
}
None => self.write_entry_local(db_name, sharded_entry.entry).await?,
}
Ok(())
}
@ -936,49 +833,28 @@ where
/// Write an entry to the local `Db`
///
/// TODO: Push this out of `Server` into `Database`
/// TODO: Remove this and migrate callers to `Database::write_entry`
pub async fn write_entry_local(&self, db_name: &DatabaseName<'_>, entry: Entry) -> Result<()> {
let database = self.active_database(db_name)?;
let db = {
let initialized = database
.initialized()
.context(DatabaseNotInitialized { db_name })?;
use database::WriteError;
if initialized.write_buffer_consumer().is_some() {
return WritingOnlyAllowedThroughWriteBuffer { db_name }.fail();
}
Arc::clone(initialized.db())
};
let bytes = entry.data().len() as u64;
db.store_entry(entry, Utc::now()).await.map_err(|e| {
self.metrics.ingest_entries_bytes_total.add_with_attributes(
bytes,
&[
metrics::KeyValue::new("status", "error"),
metrics::KeyValue::new("db_name", db_name.to_string()),
],
);
match e {
db::Error::HardLimitReached {} => Error::HardLimitReached {},
db::Error::WriteBufferWritingError { .. } => {
Error::WriteBuffer { source: e, bytes }
}
_ => Error::UnknownDatabaseError {
source: Box::new(e),
self.active_database(db_name)?
.write_entry(entry, Utc::now())
.await
.map_err(|e| match e {
WriteError::NotInitialized { .. } => Error::DatabaseNotInitialized {
db_name: db_name.to_string(),
},
}
})?;
self.metrics.ingest_entries_bytes_total.add_with_attributes(
bytes,
&[
metrics::KeyValue::new("status", "ok"),
metrics::KeyValue::new("db_name", db_name.to_string()),
],
);
Ok(())
WriteError::WriteBuffer { source } => Error::WriteBuffer { source },
WriteError::WritingOnlyAllowedThroughWriteBuffer => {
Error::WritingOnlyAllowedThroughWriteBuffer {
db_name: db_name.to_string(),
}
}
WriteError::DbError { source } => Error::UnknownDatabaseError {
source: Box::new(source),
},
WriteError::HardLimitReached { .. } => Error::HardLimitReached {},
})
}
/// Update database rules and save on success.
@ -1273,7 +1149,7 @@ mod tests {
use futures::TryStreamExt;
use influxdb_line_protocol::parse_lines;
use iox_object_store::IoxObjectStore;
use metrics::TestMetricRegistry;
use metric::{Attributes, Metric, U64Counter};
use object_store::{
path::{parsed::DirsAndFileName, ObjectStorePath},
ObjectStore, ObjectStoreApi,
@ -1546,7 +1422,7 @@ mod tests {
#[tokio::test]
async fn write_entry_local() {
let application = make_application();
let metric_registry = TestMetricRegistry::new(Arc::clone(application.metric_registry()));
let registry = Arc::clone(application.metric_registry());
let server = make_server(application);
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.wait_for_init().await.unwrap();
@ -1587,12 +1463,13 @@ mod tests {
];
assert_batches_eq!(expected, &batches);
metric_registry
.has_metric_family("ingest_entries_bytes_total")
.with_attributes(&[("status", "ok"), ("db_name", "foo")])
.counter()
.eq(240.0)
.unwrap();
let bytes = registry
.get_instrument::<Metric<U64Counter>>("ingest_entries_bytes")
.unwrap()
.get_observer(&Attributes::from(&[("status", "ok"), ("db_name", "foo")]))
.unwrap()
.fetch();
assert_eq!(bytes, 240)
}
// This tests sets up a database with a sharding config which defines exactly one shard
@ -1647,11 +1524,9 @@ mod tests {
shards: vec![TEST_SHARD_ID].into(),
..Default::default()
}),
shards: Arc::new(
vec![(TEST_SHARD_ID, Sink::Iox(remote_ids.clone()))]
.into_iter()
.collect(),
),
shards: vec![(TEST_SHARD_ID, Sink::Iox(remote_ids.clone()))]
.into_iter()
.collect(),
..Default::default()
};
@ -2469,7 +2344,7 @@ mod tests {
application.job_registry().reclaim();
let mut reporter = metric::RawReporter::default();
application.metric_registry_v2().report(&mut reporter);
application.metric_registry().report(&mut reporter);
server.shutdown();
server.join().await.unwrap();

View File

@ -16,13 +16,12 @@ use query::{exec::Executor, QueryDatabase};
use std::{borrow::Cow, convert::TryFrom, num::NonZeroU32, sync::Arc, time::Duration};
use write_buffer::core::WriteBufferWriting;
// A wrapper around a Db and a metrics registry allowing for isolated testing
// A wrapper around a Db and a metric registry allowing for isolated testing
// of a Db and its metrics.
#[derive(Debug)]
pub struct TestDb {
pub db: Arc<Db>,
pub metric_registry: metrics::TestMetricRegistry,
pub metrics_registry_v2: Arc<metric::Registry>,
pub metric_registry: Arc<metric::Registry>,
pub replay_plan: ReplayPlan,
}
@ -80,15 +79,12 @@ impl TestDbBuilder {
target_query_partitions: 4,
}));
let metric_registry = Arc::new(metrics::MetricRegistry::new());
let metrics_registry_v2 = Arc::new(metric::Registry::new());
let metric_registry = Arc::new(metric::Registry::new());
let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog(
db_name.as_str(),
Arc::clone(&iox_object_store),
server_id,
Arc::clone(&metric_registry),
Arc::clone(&metrics_registry_v2),
false,
false,
)
@ -126,12 +122,11 @@ impl TestDbBuilder {
catalog,
write_buffer_producer: self.write_buffer_producer,
exec,
metrics_registry_v2: Arc::clone(&metrics_registry_v2),
metric_registry: Arc::clone(&metric_registry),
};
TestDb {
metric_registry: metrics::TestMetricRegistry::new(metric_registry),
metrics_registry_v2,
metric_registry,
db: Db::new(
database_to_commit,
Arc::new(JobRegistry::new(Default::default())),

View File

@ -318,7 +318,7 @@ mod tests {
let consumer = WriteBufferConsumer::new(
Box::new(MockBufferForReading::new(write_buffer_state, None).unwrap()),
Arc::clone(&db),
test_db.metrics_registry_v2.as_ref(),
test_db.metric_registry.as_ref(),
);
let query = "select * from cpu";
@ -345,7 +345,7 @@ mod tests {
consumer.shutdown();
consumer.join().await.unwrap();
let metrics = test_db.metrics_registry_v2;
let metrics = test_db.metric_registry;
let observation = metrics
.get_instrument::<Metric<U64Counter>>("write_buffer_ingest_requests")
.unwrap()
@ -489,7 +489,7 @@ mod tests {
let consumer = WriteBufferConsumer::new(
Box::new(MockBufferForReading::new(write_buffer_state, None).unwrap()),
Arc::clone(&db),
test_db.metrics_registry_v2.as_ref(),
test_db.metric_registry.as_ref(),
);
// after a while the table should exist
@ -530,7 +530,7 @@ mod tests {
let test_db = TestDb::builder().build().await;
let db = Arc::new(test_db.db);
let metric_registry = test_db.metrics_registry_v2;
let metric_registry = test_db.metric_registry;
// do: start background task loop
let shutdown: CancellationToken = Default::default();

View File

@ -181,7 +181,7 @@ pub async fn main(config: Config) -> Result<()> {
// Register jemalloc metrics
application
.metric_registry_v2()
.metric_registry()
.register_instrument("jemalloc_metrics", jemalloc::JemallocMetrics::new);
let app_server = make_server(Arc::clone(&application), &config);

View File

@ -18,6 +18,8 @@ mod pprof;
mod tower;
mod metrics;
// Influx crates
use super::planner::Planner;
use data_types::{
@ -40,6 +42,7 @@ use routerify::{prelude::*, Middleware, RequestInfo, Router, RouterError};
use serde::Deserialize;
use snafu::{OptionExt, ResultExt, Snafu};
use crate::influxdb_ioxd::http::metrics::LineProtocolMetrics;
use hyper::server::conn::AddrIncoming;
use std::num::NonZeroI32;
use std::{
@ -351,6 +354,7 @@ where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
app_server: Arc<AppServer<M>>,
lp_metrics: Arc<LineProtocolMetrics>,
max_request_size: usize,
}
@ -365,6 +369,9 @@ where
let server = Server {
app_server,
max_request_size,
lp_metrics: Arc::new(LineProtocolMetrics::new(
application.metric_registry().as_ref(),
)),
};
// Create a router and specify the the handlers.
@ -486,10 +493,13 @@ where
{
let Server {
app_server: server,
lp_metrics,
max_request_size,
} = req.data::<Server<M>>().expect("server state");
let max_request_size = *max_request_size;
let server = Arc::clone(server);
let lp_metrics = Arc::clone(lp_metrics);
let query = req.uri().query().context(ExpectedQueryString)?;
@ -522,74 +532,31 @@ where
let num_lines = lines.len();
debug!(num_lines, num_fields, body_size=body.len(), %db_name, org=%write_info.org, bucket=%write_info.bucket, "inserting lines into database");
server
.write_lines(&db_name, &lines, default_time)
.await
.map_err(|e| {
let attributes = &[
metrics::KeyValue::new("status", "error"),
metrics::KeyValue::new("db_name", db_name.to_string()),
];
server
.metrics()
.ingest_lines_total
.add_with_attributes(num_lines as u64, attributes);
server
.metrics()
.ingest_fields_total
.add_with_attributes(num_fields as u64, attributes);
server
.metrics()
.ingest_points_bytes_total
.add_with_attributes(
body.len() as u64,
&[
metrics::KeyValue::new("status", "error"),
metrics::KeyValue::new("db_name", db_name.to_string()),
],
);
debug!(?e, ?db_name, ?num_lines, "error writing lines");
match e {
server::Error::DatabaseNotFound { .. } => ApplicationError::DatabaseNotFound {
db_name: db_name.to_string(),
},
_ => ApplicationError::WritingPoints {
org: write_info.org.clone(),
bucket_name: write_info.bucket.clone(),
source: Box::new(e),
},
}
})?;
let attributes = &[
metrics::KeyValue::new("status", "ok"),
metrics::KeyValue::new("db_name", db_name.to_string()),
];
server
.metrics()
.ingest_lines_total
.add_with_attributes(num_lines as u64, attributes);
server
.metrics()
.ingest_fields_total
.add_with_attributes(num_fields as u64, attributes);
// line protocol bytes successfully written
server
.metrics()
.ingest_points_bytes_total
.add_with_attributes(body.len() as u64, attributes);
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::empty())
.unwrap())
match server.write_lines(&db_name, &lines, default_time).await {
Ok(_) => {
lp_metrics.record_write(&db_name, lines.len(), num_fields, body.len(), true);
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::empty())
.unwrap())
}
Err(server::Error::DatabaseNotFound { .. }) => {
debug!(%db_name, %num_lines, "database not found");
// Purposefully do not record ingest metrics
Err(ApplicationError::DatabaseNotFound {
db_name: db_name.to_string(),
})
}
Err(e) => {
debug!(%e, %db_name, %num_lines, "error writing lines");
lp_metrics.record_write(&db_name, lines.len(), num_fields, body.len(), false);
Err(ApplicationError::WritingPoints {
org: write_info.org.clone(),
bucket_name: write_info.bucket.clone(),
source: Box::new(e),
})
}
}
}
#[derive(Deserialize, Debug, PartialEq)]
@ -670,13 +637,9 @@ async fn handle_metrics<M: ConnectionManager + Send + Sync + Debug + 'static>(
.data::<Arc<ApplicationState>>()
.expect("application state");
let mut body = application.metric_registry().metrics_as_text();
// Prometheus does not require that metrics are output in order and so can concat exports
// This will break if the same metric is published from two different exporters, but this
// is a temporary way to avoid migrating all metrics in one go
let mut body: Vec<u8> = Default::default();
let mut reporter = metric_exporters::PrometheusTextEncoder::new(&mut body);
application.metric_registry_v2().report(&mut reporter);
application.metric_registry().report(&mut reporter);
Ok(Response::new(Body::from(body)))
}
@ -904,7 +867,7 @@ pub async fn serve<M>(
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
let metric_registry = Arc::clone(application.metric_registry_v2());
let metric_registry = Arc::clone(application.metric_registry());
let router = router(application, server, max_request_size);
let new_service = tower::MakeService::new(router, trace_collector, metric_registry);
@ -927,8 +890,7 @@ mod tests {
use reqwest::{Client, Response};
use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName};
use metric::{Attributes, DurationHistogram, Metric};
use metrics::TestMetricRegistry;
use metric::{Attributes, DurationHistogram, Metric, U64Counter};
use object_store::ObjectStore;
use serde::de::DeserializeOwned;
use server::{db::Db, rules::ProvidedDatabaseRules, ApplicationState, ConnectionManagerImpl};
@ -969,7 +931,7 @@ mod tests {
let application = make_application();
let metric: Metric<U64Counter> = application
.metric_registry_v2()
.metric_registry()
.register_metric("my_metric", "description");
let app_server = make_server(Arc::clone(&application));
@ -1091,8 +1053,7 @@ mod tests {
async fn test_write_metrics() {
let application = make_application();
let app_server = make_server(Arc::clone(&application));
let metric_registry = TestMetricRegistry::new(Arc::clone(application.metric_registry()));
let metric_registry_v2 = Arc::clone(application.metric_registry_v2());
let metric_registry = Arc::clone(application.metric_registry());
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.wait_for_init().await.unwrap();
@ -1106,22 +1067,52 @@ mod tests {
let client = Client::new();
let lp_data = "h2o_temperature,location=santa_monica,state=CA surface_degrees=65.2,bottom_degrees=50.4 1568756160";
let incompatible_lp_data = "h2o_temperature,location=santa_monica,state=CA surface_degrees=\"incompatible\" 1568756170";
// send good data
let org_name = "MetricsOrg";
let bucket_name = "MetricsBucket";
let post_url = format!(
"{}/api/v2/write?bucket={}&org={}",
server_url, bucket_name, org_name
);
client
.post(&format!(
"{}/api/v2/write?bucket={}&org={}",
server_url, bucket_name, org_name
))
.post(&post_url)
.body(lp_data)
.send()
.await
.expect("sent data");
// The request completed successfully
let request_count = metric_registry_v2
let request_count = metric_registry
.get_instrument::<Metric<U64Counter>>("http_requests")
.unwrap();
let request_count_ok = request_count
.get_observer(&Attributes::from(&[
("path", "/api/v2/write"),
("status", "ok"),
]))
.unwrap()
.clone();
let request_count_client_error = request_count
.get_observer(&Attributes::from(&[
("path", "/api/v2/write"),
("status", "client_error"),
]))
.unwrap()
.clone();
let request_count_server_error = request_count
.get_observer(&Attributes::from(&[
("path", "/api/v2/write"),
("status", "server_error"),
]))
.unwrap()
.clone();
let request_duration_ok = metric_registry
.get_instrument::<Metric<DurationHistogram>>("http_request_duration")
.unwrap()
.get_observer(&Attributes::from(&[
@ -1129,36 +1120,84 @@ mod tests {
("status", "ok"),
]))
.unwrap()
.fetch()
.sample_count();
.clone();
assert_eq!(request_count, 1);
let entry_ingest = metric_registry
.get_instrument::<Metric<U64Counter>>("ingest_entries_bytes")
.unwrap();
let entry_ingest_ok = entry_ingest
.get_observer(&Attributes::from(&[
("db_name", "MetricsOrg_MetricsBucket"),
("status", "ok"),
]))
.unwrap()
.clone();
let entry_ingest_error = entry_ingest
.get_observer(&Attributes::from(&[
("db_name", "MetricsOrg_MetricsBucket"),
("status", "error"),
]))
.unwrap()
.clone();
assert_eq!(request_duration_ok.fetch().sample_count(), 1);
assert_eq!(request_count_ok.fetch(), 1);
assert_eq!(request_count_client_error.fetch(), 0);
assert_eq!(request_count_server_error.fetch(), 0);
assert_ne!(entry_ingest_ok.fetch(), 0);
assert_eq!(entry_ingest_error.fetch(), 0);
// A single successful point landed
metric_registry
.has_metric_family("ingest_points_total")
.with_attributes(&[("db_name", "MetricsOrg_MetricsBucket"), ("status", "ok")])
.counter()
.eq(1.0)
let ingest_lines = metric_registry
.get_instrument::<Metric<U64Counter>>("ingest_lines")
.unwrap();
let ingest_lines_ok = ingest_lines
.get_observer(&Attributes::from(&[
("db_name", "MetricsOrg_MetricsBucket"),
("status", "ok"),
]))
.unwrap()
.clone();
let ingest_lines_error = ingest_lines
.get_observer(&Attributes::from(&[
("db_name", "MetricsOrg_MetricsBucket"),
("status", "error"),
]))
.unwrap()
.clone();
assert_eq!(ingest_lines_ok.fetch(), 1);
assert_eq!(ingest_lines_error.fetch(), 0);
// Which consists of two fields
metric_registry
.has_metric_family("ingest_fields_total")
.with_attributes(&[("db_name", "MetricsOrg_MetricsBucket"), ("status", "ok")])
.counter()
.eq(2.0)
.unwrap();
let observation = metric_registry
.get_instrument::<Metric<U64Counter>>("ingest_fields")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "MetricsOrg_MetricsBucket"),
("status", "ok"),
]))
.unwrap()
.fetch();
assert_eq!(observation, 2);
// Bytes of data were written
metric_registry
.has_metric_family("ingest_points_bytes_total")
.with_attributes(&[("status", "ok"), ("db_name", "MetricsOrg_MetricsBucket")])
.counter()
.eq(98.0)
.unwrap();
let observation = metric_registry
.get_instrument::<Metric<U64Counter>>("ingest_bytes")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "MetricsOrg_MetricsBucket"),
("status", "ok"),
]))
.unwrap()
.fetch();
assert_eq!(observation, 98);
// Generate an error
// Write to a non-existent database
client
.post(&format!(
"{}/api/v2/write?bucket=NotMyBucket&org=NotMyOrg",
@ -1169,13 +1208,36 @@ mod tests {
.await
.unwrap();
// A single point was rejected
metric_registry
.has_metric_family("ingest_points_total")
.with_attributes(&[("db_name", "NotMyOrg_NotMyBucket"), ("status", "error")])
.counter()
.eq(1.0)
// An invalid database should not be reported as a new metric
assert!(metric_registry
.get_instrument::<Metric<U64Counter>>("ingest_lines")
.unwrap()
.get_observer(&Attributes::from(&[
("db_name", "NotMyOrg_NotMyBucket"),
("status", "error"),
]))
.is_none());
assert_eq!(ingest_lines_ok.fetch(), 1);
assert_eq!(ingest_lines_error.fetch(), 0);
// Perform an invalid write
client
.post(&post_url)
.body(incompatible_lp_data)
.send()
.await
.unwrap();
// This currently results in an InternalServerError which is correctly recorded
// as a server error, but this should probably be a BadRequest client error (#2538)
assert_eq!(ingest_lines_ok.fetch(), 1);
assert_eq!(ingest_lines_error.fetch(), 1);
assert_eq!(request_duration_ok.fetch().sample_count(), 1);
assert_eq!(request_count_ok.fetch(), 1);
assert_eq!(request_count_client_error.fetch(), 0);
assert_eq!(request_count_server_error.fetch(), 1);
assert_ne!(entry_ingest_ok.fetch(), 0);
assert_ne!(entry_ingest_error.fetch(), 0);
}
/// Sets up a test database with some data for testing the query endpoint

View File

@ -0,0 +1,115 @@
use hashbrown::HashMap;
use metric::{Attributes, Metric, U64Counter};
use parking_lot::{MappedMutexGuard, Mutex, MutexGuard};
/// Line protocol ingest metrics
#[derive(Debug)]
pub struct LineProtocolMetrics {
/// The number of LP lines ingested
ingest_lines: Metric<U64Counter>,
/// The number of LP fields ingested
ingest_fields: Metric<U64Counter>,
/// The number of LP bytes ingested
ingest_bytes: Metric<U64Counter>,
/// Database metrics keyed by database name
databases: Mutex<HashMap<String, LineProtocolDatabaseMetrics>>,
}
/// Line protocol metrics for a given database
#[derive(Debug)]
struct LineProtocolDatabaseMetrics {
/// The number of LP lines ingested successfully
ingest_lines_ok: U64Counter,
/// The number of LP lines ingested unsuccessfully
ingest_lines_error: U64Counter,
/// The number of LP fields ingested successfully
ingest_fields_ok: U64Counter,
/// The number of LP fields ingested unsuccessfully
ingest_fields_error: U64Counter,
/// The number of LP bytes ingested successfully
ingest_bytes_ok: U64Counter,
/// The number of LP bytes ingested unsuccessfully
ingest_bytes_error: U64Counter,
}
impl LineProtocolMetrics {
pub fn new(registry: &metric::Registry) -> Self {
Self {
ingest_lines: registry.register_metric("ingest_lines", "total LP points ingested"),
ingest_fields: registry
.register_metric("ingest_fields", "total LP field values ingested"),
ingest_bytes: registry.register_metric("ingest_bytes", "total LP bytes ingested"),
databases: Default::default(),
}
}
pub fn record_write(
&self,
db_name: &str,
lines: usize,
fields: usize,
bytes: usize,
success: bool,
) {
let metrics = self.database_metrics(db_name);
match success {
true => {
metrics.ingest_lines_ok.inc(lines as u64);
metrics.ingest_fields_ok.inc(fields as u64);
metrics.ingest_bytes_ok.inc(bytes as u64);
}
false => {
metrics.ingest_lines_error.inc(lines as u64);
metrics.ingest_fields_error.inc(fields as u64);
metrics.ingest_bytes_error.inc(bytes as u64);
}
}
}
fn database_metrics(&self, db_name: &str) -> MappedMutexGuard<'_, LineProtocolDatabaseMetrics> {
MutexGuard::map(self.databases.lock(), |databases| {
let (_, metrics) = databases
.raw_entry_mut()
.from_key(db_name)
.or_insert_with(|| {
let metrics = LineProtocolDatabaseMetrics::new(self, db_name);
(db_name.to_string(), metrics)
});
metrics
})
}
}
impl LineProtocolDatabaseMetrics {
fn new(metrics: &LineProtocolMetrics, db_name: &str) -> Self {
let mut attributes = Attributes::from([("db_name", db_name.to_string().into())]);
attributes.insert("status", "ok");
let ingest_lines_ok = metrics.ingest_lines.recorder(attributes.clone());
let ingest_fields_ok = metrics.ingest_fields.recorder(attributes.clone());
let ingest_bytes_ok = metrics.ingest_bytes.recorder(attributes.clone());
attributes.insert("status", "error");
let ingest_lines_error = metrics.ingest_lines.recorder(attributes.clone());
let ingest_fields_error = metrics.ingest_fields.recorder(attributes.clone());
let ingest_bytes_error = metrics.ingest_bytes.recorder(attributes.clone());
Self {
ingest_lines_ok,
ingest_lines_error,
ingest_fields_ok,
ingest_fields_error,
ingest_bytes_ok,
ingest_bytes_error,
}
}
}

View File

@ -105,7 +105,7 @@ where
let builder = tonic::transport::Server::builder();
let mut builder = builder.layer(trace_http::tower::TraceLayer::new(
Arc::clone(application.metric_registry_v2()),
Arc::clone(application.metric_registry()),
trace_collector,
true,
));