Merge branch 'main' into jgm-fix-perf

pull/24376/head
kodiakhq[bot] 2021-09-09 15:19:18 +00:00 committed by GitHub
commit 02b5361b7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 101 additions and 299 deletions

1
Cargo.lock generated
View File

@ -2206,6 +2206,7 @@ dependencies = [
"metric",
"observability_deps",
"prometheus",
"test_helpers",
]
[[package]]

View File

@ -1,7 +1,7 @@
//! This module contains helper code for building `Entry` from line protocol and the
//! `DatabaseRules` configuration.
use std::{collections::BTreeMap, convert::TryFrom, fmt::Formatter, num::NonZeroU64};
use std::{collections::BTreeMap, convert::TryFrom, fmt::Formatter};
use chrono::{DateTime, TimeZone, Utc};
use flatbuffers::{FlatBufferBuilder, Follow, ForwardsUOffset, Vector, VectorIter, WIPOffset};
@ -1697,43 +1697,6 @@ enum ColumnRaw<'a> {
Bool(Vec<bool>),
}
#[derive(Debug, PartialOrd, PartialEq, Copy, Clone)]
pub struct ClockValue(NonZeroU64);
impl ClockValue {
pub fn new(v: NonZeroU64) -> Self {
Self(v)
}
pub fn get(&self) -> NonZeroU64 {
self.0
}
pub fn get_u64(&self) -> u64 {
self.0.get()
}
}
impl TryFrom<u64> for ClockValue {
type Error = ClockValueError;
fn try_from(value: u64) -> Result<Self, Self::Error> {
NonZeroU64::new(value)
.map(Self)
.context(ValueMayNotBeZero)
.map_err(Into::into)
}
}
#[derive(Debug, Snafu)]
pub struct ClockValueError(InnerClockValueError);
#[derive(Debug, Snafu)]
enum InnerClockValueError {
#[snafu(display("Clock values must not be zero"))]
ValueMayNotBeZero,
}
#[derive(Debug, Clone)]
pub struct SequencedEntry {
entry: Entry,

View File

@ -11,3 +11,4 @@ metric = { path = "../metric" }
prometheus = "0.12"
[dev-dependencies] # In alphabetical order
test_helpers = { path = "../test_helpers" }

View File

@ -29,7 +29,9 @@ use prometheus::{
///
#[derive(Debug)]
pub struct PrometheusTextEncoder<'a, W: Write> {
metric: Option<MetricFamily>,
/// metric family together with a flag indicating that it was used
metric: Option<(MetricFamily, bool)>,
encoder: TextEncoder,
writer: &'a mut W,
}
@ -72,15 +74,13 @@ impl<'a, W: Write> metric::Reporter for PrometheusTextEncoder<'a, W> {
metric.set_help(description.to_string());
metric.set_field_type(metric_type);
self.metric = Some(metric)
self.metric = Some((metric, false))
}
fn report_observation(&mut self, attributes: &Attributes, observation: Observation) {
let metrics = self
.metric
.as_mut()
.expect("no metric in progress")
.mut_metric();
let (metrics, used) = self.metric.as_mut().expect("no metric in progress");
let metrics = metrics.mut_metric();
let mut metric = Metric::new();
@ -157,11 +157,18 @@ impl<'a, W: Write> metric::Reporter for PrometheusTextEncoder<'a, W> {
metric.set_histogram(histogram)
}
};
metrics.push(metric)
metrics.push(metric);
*used = true;
}
fn finish_metric(&mut self) {
if let Some(family) = self.metric.take() {
if let Some((family, used)) = self.metric.take() {
if !used {
// just don't report the metric
return;
}
match self.encoder.encode(&[family], self.writer) {
Ok(_) => {}
Err(e) => error!(%e, "error encoding metric family"),
@ -174,13 +181,17 @@ impl<'a, W: Write> metric::Reporter for PrometheusTextEncoder<'a, W> {
mod tests {
use super::*;
use metric::{
DurationCounter, DurationGauge, Metric, Registry, U64Counter, U64Histogram,
U64HistogramOptions,
DurationCounter, DurationGauge, DurationHistogram, Metric, Registry, U64Counter,
U64Histogram, U64HistogramOptions,
};
use std::time::Duration;
use test_helpers::assert_not_contains;
#[test]
fn test_encode() {
// tap tracing to check for errors
let tracing_capture = test_helpers::tracing::TracingCapture::new();
let registry = Registry::new();
let counter: Metric<U64Counter> = registry.register_metric("foo", "a counter metric");
@ -220,6 +231,9 @@ mod tests {
.recorder(&[("tag1", "value1")])
.inc(Duration::from_millis(1200));
// unused metrics must not result in an error
let _unused: Metric<DurationHistogram> = registry.register_metric("unused", "unused");
let mut buffer = Vec::new();
let mut encoder = PrometheusTextEncoder::new(&mut buffer);
registry.report(&mut encoder);
@ -254,6 +268,9 @@ foo_total{tag1="value",tag2="value2"} 7
"#
.trim_start();
assert_eq!(&buffer, expected, "{}", buffer)
assert_eq!(&buffer, expected, "{}", buffer);
// no errors
assert_not_contains!(tracing_capture.to_string(), "error");
}
}

View File

@ -67,7 +67,6 @@ mod chunk;
mod lifecycle;
pub mod load;
pub mod pred;
mod process_clock;
mod replay;
mod streams;
mod system_tables;
@ -360,11 +359,6 @@ pub struct Db {
/// Catalog interface for query
catalog_access: Arc<QueryCatalogAccess>,
/// Process clock used in establishing a partial ordering of operations via a Lamport Clock.
///
/// Value is nanoseconds since the Unix Epoch.
process_clock: process_clock::ProcessClock,
/// Number of iterations of the worker lifecycle loop for this Db
worker_iterations_lifecycle: AtomicUsize,
@ -442,8 +436,6 @@ impl Db {
);
let catalog_access = Arc::new(catalog_access);
let process_clock = process_clock::ProcessClock::new();
let this = Self {
rules,
server_id,
@ -454,7 +446,6 @@ impl Db {
jobs,
metrics_registry,
catalog_access,
process_clock,
worker_iterations_lifecycle: AtomicUsize::new(0),
worker_iterations_cleanup: AtomicUsize::new(0),
metric_attributes,

View File

@ -1,167 +0,0 @@
//! Process clock used in establishing a partial ordering of operations via a Lamport Clock.
use chrono::Utc;
use entry::ClockValue;
use std::{
convert::{TryFrom, TryInto},
sync::atomic::{AtomicU64, Ordering},
};
#[derive(Debug)]
pub struct ProcessClock {
inner: AtomicU64,
}
impl ProcessClock {
/// Create a new process clock value initialized to the current system time.
pub fn new() -> Self {
Self {
inner: AtomicU64::new(system_clock_now()),
}
}
/// Returns the next process clock value, which will be the maximum of the system time in
/// nanoseconds or the previous process clock value plus 1. Every operation that needs a
/// process clock value should be incrementing it as well, so there should never be a read of
/// the process clock without an accompanying increment of at least 1 nanosecond.
///
/// We expect that updates to the process clock are not so frequent and the system is slow
/// enough that the returned value will be incremented by at least 1.
#[allow(dead_code)]
pub fn next(&self) -> ClockValue {
let next = loop {
if let Ok(next) = self.try_update() {
break next;
}
};
ClockValue::try_from(next).expect("process clock should not be 0")
}
#[allow(dead_code)]
fn try_update(&self) -> Result<u64, u64> {
let now = system_clock_now();
let current_process_clock = self.inner.load(Ordering::SeqCst);
let next_candidate = current_process_clock + 1;
let next = now.max(next_candidate);
self.inner
.compare_exchange(
current_process_clock,
next,
Ordering::SeqCst,
Ordering::SeqCst,
)
.map(|_| next)
}
}
// Convenience function for getting the current time in a `u64` represented as nanoseconds since
// the epoch
//
// While this might jump backwards, the logic above that takes the maximum of the current process
// clock and the value returned from this function should ensure that the process clock is
// strictly increasing.
fn system_clock_now() -> u64 {
Utc::now()
.timestamp_nanos()
.try_into()
.expect("current time since the epoch should be positive")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::TestDb;
use std::{sync::Arc, thread, time::Duration};
#[tokio::test]
async fn process_clock_defaults_to_current_time_in_ns() {
let before = system_clock_now();
let db = Arc::new(TestDb::builder().build().await.db);
let db_process_clock = db.process_clock.inner.load(Ordering::SeqCst);
let after = system_clock_now();
assert!(
before < db_process_clock,
"expected {} to be less than {}",
before,
db_process_clock
);
assert!(
db_process_clock < after,
"expected {} to be less than {}",
db_process_clock,
after
);
}
#[tokio::test]
async fn next_process_clock_always_increments() {
// Process clock defaults to the current time
let db = Arc::new(TestDb::builder().build().await.db);
// Set the process clock value to a time in the future, so that when compared to the
// current time, the process clock value will be greater
let later: u64 = (Utc::now() + chrono::Duration::weeks(4))
.timestamp_nanos()
.try_into()
.unwrap();
db.process_clock.inner.store(later, Ordering::SeqCst);
// Every call to next_process_clock should increment at least 1, even in this case
// where the system time will be less than the process clock
assert_eq!(
db.process_clock.next(),
ClockValue::try_from(later + 1).unwrap()
);
assert_eq!(
db.process_clock.next(),
ClockValue::try_from(later + 2).unwrap()
);
}
#[test]
fn process_clock_multithreaded_access_always_increments() {
let pc = Arc::new(ProcessClock::new());
let handles: Vec<_> = (0..10)
.map(|thread_num| {
let pc = Arc::clone(&pc);
thread::spawn(move || {
let mut pc_val_before = pc.next();
for iteration in 0..10 {
let pc_val_after = pc.next();
// This might be useful for debugging if this test fails
println!(
"thread {} in iteration {} testing {:?} < {:?}",
thread_num, iteration, pc_val_before, pc_val_after
);
// Process clock should always increase
assert!(
pc_val_before < pc_val_after,
"expected {:?} to be less than {:?}",
pc_val_before,
pc_val_after
);
pc_val_before = pc_val_after;
// encourage yielding
thread::sleep(Duration::from_millis(1));
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
}
}

View File

@ -5,6 +5,7 @@ use std::{
convert::Infallible,
ops::DerefMut,
sync::Arc,
time::Duration,
};
use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt};
@ -106,18 +107,49 @@ impl JobRegistryMetrics {
active_gauge: metric_registry_v2
.register_metric("influxdb_iox_job_count", "Number of known jobs"),
completed_accu: Default::default(),
cpu_time_histogram: metric_registry_v2.register_metric(
"influxdb_iox_job_completed_cpu_nanoseconds",
"CPU time of of completed jobs in nanoseconds",
cpu_time_histogram: metric_registry_v2.register_metric_with_options(
"influxdb_iox_job_completed_cpu",
"CPU time of of completed jobs",
Self::duration_histogram_options,
),
wall_time_histogram: metric_registry_v2.register_metric(
"influxdb_iox_job_completed_wall_nanoseconds",
"Wall time of of completed jobs in nanoseconds",
wall_time_histogram: metric_registry_v2.register_metric_with_options(
"influxdb_iox_job_completed_wall",
"Wall time of of completed jobs",
Self::duration_histogram_options,
),
completed_but_still_tracked: Default::default(),
}
}
fn duration_histogram_options() -> metric::DurationHistogramOptions {
metric::DurationHistogramOptions::new(vec![
Duration::from_millis(5),
Duration::from_millis(10),
Duration::from_millis(25),
Duration::from_millis(50),
Duration::from_millis(100),
Duration::from_millis(250),
Duration::from_millis(500),
Duration::from_millis(1000),
Duration::from_millis(2500),
Duration::from_millis(5000),
Duration::from_millis(10000),
Duration::from_millis(1_000),
Duration::from_millis(2_500),
Duration::from_millis(5_000),
Duration::from_millis(10_000),
Duration::from_millis(25_000),
Duration::from_millis(50_000),
Duration::from_millis(100_000),
Duration::from_millis(250_000),
Duration::from_millis(500_000),
Duration::from_millis(1_000_000),
Duration::from_millis(2_500_000),
Duration::from_millis(5_000_000),
metric::DURATION_MAX,
])
}
fn update(&mut self, registry: &TaskRegistryWithHistory<Job>, pruned: Vec<TaskTracker<Job>>) {
// scan pruned jobs
for job in pruned {
@ -164,7 +196,7 @@ impl JobRegistryMetrics {
let metadata = job.metadata();
let status = job.get_status();
metric::Attributes::from(&[
let mut attributes = metric::Attributes::from(&[
("description", metadata.description()),
(
"status",
@ -173,7 +205,15 @@ impl JobRegistryMetrics {
.map(|result| result.name())
.unwrap_or_else(|| status.name()),
),
])
]);
if let Some(db_name) = metadata.db_name() {
attributes.insert("db_name", db_name.to_string());
}
if let Some(table) = metadata.table_name() {
attributes.insert("table", table.to_string());
}
attributes
}
fn process_completed_job(&mut self, job: &TaskTracker<Job>) {

View File

@ -2439,7 +2439,7 @@ mod tests {
let wait_nanos = 1000;
let job = application
.job_registry()
.spawn_dummy_job(vec![wait_nanos], None);
.spawn_dummy_job(vec![wait_nanos], Some(Arc::from("some_db")));
job.join().await;
@ -2453,81 +2453,37 @@ mod tests {
server.join().await.unwrap();
// ========== influxdb_iox_job_count ==========
let observations: Vec<_> = reporter
.observations()
.iter()
.filter(|observation| observation.metric_name == "influxdb_iox_job_count")
.collect();
assert_eq!(observations.len(), 1);
let gauge = observations[0];
assert_eq!(gauge.kind, metric::MetricKind::U64Gauge);
let observations: Vec<_> = gauge
.observations
.iter()
.filter(|(attributes, _)| {
attributes
.iter()
.any(|(k, v)| (k == &"status") && (v == "Success"))
})
.collect();
assert_eq!(observations.len(), 1);
let (attributes, observation) = &observations[0];
assert_eq!(
attributes,
&metric::Attributes::from(&[
let metric = reporter.metric("influxdb_iox_job_count").unwrap();
assert_eq!(metric.kind, metric::MetricKind::U64Gauge);
let observation = metric
.observation(&[
("description", "Dummy Job, for testing"),
("status", "Success"),
("db_name", "some_db"),
])
);
.unwrap();
assert_eq!(observation, &metric::Observation::U64Gauge(1));
// ========== influxdb_iox_job_completed_cpu_nanoseconds ==========
let observations: Vec<_> = reporter
.observations()
.iter()
.filter(|observation| {
observation.metric_name == "influxdb_iox_job_completed_cpu_nanoseconds"
})
.collect();
assert_eq!(observations.len(), 1);
let histogram = observations[0];
assert_eq!(histogram.kind, metric::MetricKind::DurationHistogram);
assert_eq!(histogram.observations.len(), 1);
let (attributes, _) = &histogram.observations[0];
assert_eq!(
attributes,
&metric::Attributes::from(&[
// ========== influxdb_iox_job_completed_cpu ==========
let metric = reporter.metric("influxdb_iox_job_completed_cpu").unwrap();
assert_eq!(metric.kind, metric::MetricKind::DurationHistogram);
metric
.observation(&[
("description", "Dummy Job, for testing"),
("status", "Success"),
("db_name", "some_db"),
])
);
.unwrap();
// ========== influxdb_iox_job_completed_wall_nanoseconds ==========
let observations: Vec<_> = reporter
.observations()
.iter()
.filter(|observation| {
observation.metric_name == "influxdb_iox_job_completed_wall_nanoseconds"
})
.collect();
assert_eq!(observations.len(), 1);
let histogram = observations[0];
assert_eq!(histogram.kind, metric::MetricKind::DurationHistogram);
assert_eq!(histogram.observations.len(), 1);
let (attributes, _) = &histogram.observations[0];
assert_eq!(
attributes,
&metric::Attributes::from(&[
// ========== influxdb_iox_job_completed_wall ==========
let metric = reporter.metric("influxdb_iox_job_completed_wall").unwrap();
assert_eq!(metric.kind, metric::MetricKind::DurationHistogram);
metric
.observation(&[
("description", "Dummy Job, for testing"),
("status", "Success"),
("db_name", "some_db"),
])
);
.unwrap();
}
}