Merge branch 'main' into dom/cached-fsm-schema

pull/24376/head
Dom 2023-07-27 10:31:02 +01:00 committed by GitHub
commit a37b85804d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 509 additions and 224 deletions

21
Cargo.lock generated
View File

@ -981,6 +981,7 @@ dependencies = [
"metric", "metric",
"object_store", "object_store",
"observability_deps", "observability_deps",
"parking_lot",
"parquet_file", "parquet_file",
"rand", "rand",
"schema", "schema",
@ -4184,9 +4185,9 @@ checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964"
[[package]] [[package]]
name = "pprof" name = "pprof"
version = "0.12.0" version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b90f8560ad8bd57b207b8293bc5226e48e89039a6e590c12a297d91b84c7e60" checksum = "978385d59daf9269189d052ca8a84c1acfd0715c0599a5d5188d4acc078ca46a"
dependencies = [ dependencies = [
"backtrace", "backtrace",
"cfg-if", "cfg-if",
@ -4905,18 +4906,18 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.175" version = "1.0.176"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d25439cd7397d044e2748a6fe2432b5e85db703d6d097bd014b3c0ad1ebff0b" checksum = "76dc28c9523c5d70816e393136b86d48909cfb27cecaa902d338c19ed47164dc"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.175" version = "1.0.176"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b23f7ade6f110613c0d63858ddb8b94c1041f550eab58a16b371bdf2c9c80ab4" checksum = "a4e7b8c5dc823e3b90651ff1d3808419cd14e5ad76de04feaf37da114e7a306f"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -4925,9 +4926,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.103" version = "1.0.104"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d03b412469450d4404fe8499a268edd7f8b79fecb074b0d812ad64ca21f4031b" checksum = "076066c5f1078eac5b722a31827a8832fe108bed65dfa75e233c89f8206e976c"
dependencies = [ dependencies = [
"itoa", "itoa",
"ryu", "ryu",
@ -5690,9 +5691,9 @@ checksum = "d3543ca0810e71767052bdcdd5653f23998b192642a22c5164bfa6581e40a4a2"
[[package]] [[package]]
name = "sysinfo" name = "sysinfo"
version = "0.29.6" version = "0.29.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7cb97a5a85a136d84e75d5c3cf89655090602efb1be0d8d5337b7e386af2908" checksum = "165d6d8539689e3d3bc8b98ac59541e1f21c7de7c85d60dc80e43ae0ed2113db"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"core-foundation-sys", "core-foundation-sys",

View File

@ -125,6 +125,18 @@ pub struct RouterConfig {
default_value = "1" default_value = "1"
)] )]
pub rpc_write_replicas: NonZeroUsize, pub rpc_write_replicas: NonZeroUsize,
/// Specify the (discrete) slices of time in which the router's write
/// request failures must exceed the write client's maximum error ratio of
/// 80% for a downstream RPC write handler to be considered in the unhealthy
/// state.
#[clap(
long = "rpc-write-health-error-window-seconds",
env = "INFLUXDB_IOX_RPC_WRITE_HEALTH_ERROR_WINDOW_SECONDS",
default_value = "5",
value_parser = parse_duration
)]
pub rpc_write_health_error_window_seconds: Duration,
} }
/// Map a string containing an integer number of seconds into a [`Duration`]. /// Map a string containing an integer number of seconds into a [`Duration`].

View File

@ -30,6 +30,7 @@ trace = { version = "0.1.0", path = "../trace" }
tracker = { path = "../tracker" } tracker = { path = "../tracker" }
uuid = { version = "1", features = ["v4"] } uuid = { version = "1", features = ["v4"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" } workspace-hack = { version = "0.1", path = "../workspace-hack" }
parking_lot = "0.12.1"
[dev-dependencies] [dev-dependencies]
arrow_util = { path = "../arrow_util" } arrow_util = { path = "../arrow_util" }

View File

@ -4,7 +4,7 @@
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use compactor_scheduler::{create_scheduler, PartitionsSource, Scheduler}; use compactor_scheduler::{create_scheduler, Scheduler};
use data_types::CompactionLevel; use data_types::CompactionLevel;
use object_store::memory::InMemory; use object_store::memory::InMemory;
@ -64,7 +64,7 @@ use super::{
logging::LoggingPartitionsSourceWrapper, metrics::MetricsPartitionsSourceWrapper, logging::LoggingPartitionsSourceWrapper, metrics::MetricsPartitionsSourceWrapper,
not_empty::NotEmptyPartitionsSourceWrapper, not_empty::NotEmptyPartitionsSourceWrapper,
randomize_order::RandomizeOrderPartitionsSourcesWrapper, randomize_order::RandomizeOrderPartitionsSourcesWrapper,
scheduled::ScheduledPartitionsSource, scheduled::ScheduledPartitionsSource, PartitionsSource,
}, },
post_classification_partition_filter::{ post_classification_partition_filter::{
logging::LoggingPostClassificationFilterWrapper, logging::LoggingPostClassificationFilterWrapper,

View File

@ -1,10 +1,11 @@
use std::{collections::VecDeque, fmt::Display, sync::Arc}; use std::{collections::VecDeque, fmt::Display, sync::Arc};
use compactor_scheduler::PartitionsSource; use compactor_scheduler::CompactionJob;
use data_types::PartitionId;
use futures::{stream::BoxStream, StreamExt}; use futures::{stream::BoxStream, StreamExt};
use super::super::partition_files_source::rate_limit::RateLimit; use super::super::{
partition_files_source::rate_limit::RateLimit, partitions_source::PartitionsSource,
};
use super::PartitionStream; use super::PartitionStream;
#[derive(Debug)] #[derive(Debug)]
@ -41,7 +42,7 @@ impl<T> PartitionStream for EndlessPartititionStream<T>
where where
T: PartitionsSource, T: PartitionsSource,
{ {
fn stream(&self) -> BoxStream<'_, PartitionId> { fn stream(&self) -> BoxStream<'_, CompactionJob> {
let source = Arc::clone(&self.source); let source = Arc::clone(&self.source);
// Note: we use a VecDeque as a buffer so we can preserve the order and cheaply remove the first element without // Note: we use a VecDeque as a buffer so we can preserve the order and cheaply remove the first element without
@ -79,9 +80,9 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use compactor_scheduler::MockPartitionsSource; use data_types::PartitionId;
use super::*; use super::{super::super::partitions_source::mock::MockPartitionsSource, *};
#[test] #[test]
fn test_display() { fn test_display() {
@ -92,9 +93,9 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_stream() { async fn test_stream() {
let ids = vec![ let ids = vec![
PartitionId::new(1), CompactionJob::new(PartitionId::new(1)),
PartitionId::new(3), CompactionJob::new(PartitionId::new(3)),
PartitionId::new(2), CompactionJob::new(PartitionId::new(2)),
]; ];
let stream = EndlessPartititionStream::new(MockPartitionsSource::new(ids.clone())); let stream = EndlessPartititionStream::new(MockPartitionsSource::new(ids.clone()));
@ -103,13 +104,7 @@ mod tests {
// we need to limit the stream at one point because it is endless // we need to limit the stream at one point because it is endless
assert_eq!( assert_eq!(
stream.stream().take(5).collect::<Vec<_>>().await, stream.stream().take(5).collect::<Vec<_>>().await,
vec![ [&ids[..], &ids[..2]].concat(),
PartitionId::new(1),
PartitionId::new(3),
PartitionId::new(2),
PartitionId::new(1),
PartitionId::new(3)
],
); );
} }
} }

View File

@ -1,6 +1,6 @@
use std::fmt::{Debug, Display}; use std::fmt::{Debug, Display};
use data_types::PartitionId; use compactor_scheduler::CompactionJob;
use futures::stream::BoxStream; use futures::stream::BoxStream;
pub mod endless; pub mod endless;
@ -8,8 +8,8 @@ pub mod once;
/// Source for partitions. /// Source for partitions.
pub trait PartitionStream: Debug + Display + Send + Sync { pub trait PartitionStream: Debug + Display + Send + Sync {
/// Create new source stream of partitions. /// Create new source stream of compaction job.
/// ///
/// This stream may be endless. /// This stream may be endless.
fn stream(&self) -> BoxStream<'_, PartitionId>; fn stream(&self) -> BoxStream<'_, CompactionJob>;
} }

View File

@ -1,10 +1,9 @@
use std::{fmt::Display, sync::Arc}; use std::{fmt::Display, sync::Arc};
use compactor_scheduler::PartitionsSource; use compactor_scheduler::CompactionJob;
use data_types::PartitionId;
use futures::{stream::BoxStream, StreamExt}; use futures::{stream::BoxStream, StreamExt};
use super::PartitionStream; use super::{super::partitions_source::PartitionsSource, PartitionStream};
#[derive(Debug)] #[derive(Debug)]
pub struct OncePartititionStream<T> pub struct OncePartititionStream<T>
@ -38,7 +37,7 @@ impl<T> PartitionStream for OncePartititionStream<T>
where where
T: PartitionsSource, T: PartitionsSource,
{ {
fn stream(&self) -> BoxStream<'_, PartitionId> { fn stream(&self) -> BoxStream<'_, CompactionJob> {
let source = Arc::clone(&self.source); let source = Arc::clone(&self.source);
futures::stream::once(async move { futures::stream::iter(source.fetch().await) }) futures::stream::once(async move { futures::stream::iter(source.fetch().await) })
.flatten() .flatten()
@ -48,9 +47,9 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use compactor_scheduler::MockPartitionsSource; use data_types::PartitionId;
use super::*; use super::{super::super::partitions_source::mock::MockPartitionsSource, *};
#[test] #[test]
fn test_display() { fn test_display() {
@ -61,9 +60,9 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_stream() { async fn test_stream() {
let ids = vec![ let ids = vec![
PartitionId::new(1), CompactionJob::new(PartitionId::new(1)),
PartitionId::new(3), CompactionJob::new(PartitionId::new(3)),
PartitionId::new(2), CompactionJob::new(PartitionId::new(2)),
]; ];
let stream = OncePartititionStream::new(MockPartitionsSource::new(ids.clone())); let stream = OncePartititionStream::new(MockPartitionsSource::new(ids.clone()));

View File

@ -1,10 +1,11 @@
use std::fmt::Display; use std::fmt::Display;
use async_trait::async_trait; use async_trait::async_trait;
use compactor_scheduler::PartitionsSource; use compactor_scheduler::CompactionJob;
use data_types::PartitionId;
use observability_deps::tracing::{info, warn}; use observability_deps::tracing::{info, warn};
use super::PartitionsSource;
#[derive(Debug)] #[derive(Debug)]
pub struct LoggingPartitionsSourceWrapper<T> pub struct LoggingPartitionsSourceWrapper<T>
where where
@ -36,7 +37,7 @@ impl<T> PartitionsSource for LoggingPartitionsSourceWrapper<T>
where where
T: PartitionsSource, T: PartitionsSource,
{ {
async fn fetch(&self) -> Vec<PartitionId> { async fn fetch(&self) -> Vec<CompactionJob> {
let partitions = self.inner.fetch().await; let partitions = self.inner.fetch().await;
info!(n_partitions = partitions.len(), "Fetch partitions",); info!(n_partitions = partitions.len(), "Fetch partitions",);
if partitions.is_empty() { if partitions.is_empty() {
@ -48,10 +49,10 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use compactor_scheduler::MockPartitionsSource; use data_types::PartitionId;
use test_helpers::tracing::TracingCapture; use test_helpers::tracing::TracingCapture;
use super::*; use super::{super::mock::MockPartitionsSource, *};
#[test] #[test]
fn test_display() { fn test_display() {
@ -74,9 +75,9 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_fetch_some() { async fn test_fetch_some() {
let p_1 = PartitionId::new(5); let p_1 = CompactionJob::new(PartitionId::new(5));
let p_2 = PartitionId::new(1); let p_2 = CompactionJob::new(PartitionId::new(1));
let p_3 = PartitionId::new(12); let p_3 = CompactionJob::new(PartitionId::new(12));
let partitions = vec![p_1, p_2, p_3]; let partitions = vec![p_1, p_2, p_3];
let source = let source =

View File

@ -1,10 +1,11 @@
use std::fmt::Display; use std::fmt::Display;
use async_trait::async_trait; use async_trait::async_trait;
use compactor_scheduler::PartitionsSource; use compactor_scheduler::CompactionJob;
use data_types::PartitionId;
use metric::{Registry, U64Counter}; use metric::{Registry, U64Counter};
use super::PartitionsSource;
const METRIC_NAME_PARTITIONS_FETCH_COUNT: &str = "iox_compactor_partitions_fetch_count"; const METRIC_NAME_PARTITIONS_FETCH_COUNT: &str = "iox_compactor_partitions_fetch_count";
const METRIC_NAME_PARTITIONS_COUNT: &str = "iox_compactor_partitions_count"; const METRIC_NAME_PARTITIONS_COUNT: &str = "iox_compactor_partitions_count";
@ -58,7 +59,7 @@ impl<T> PartitionsSource for MetricsPartitionsSourceWrapper<T>
where where
T: PartitionsSource, T: PartitionsSource,
{ {
async fn fetch(&self) -> Vec<PartitionId> { async fn fetch(&self) -> Vec<CompactionJob> {
let partitions = self.inner.fetch().await; let partitions = self.inner.fetch().await;
self.partitions_fetch_counter.inc(1); self.partitions_fetch_counter.inc(1);
self.partitions_counter.inc(partitions.len() as u64); self.partitions_counter.inc(partitions.len() as u64);
@ -68,10 +69,10 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use compactor_scheduler::MockPartitionsSource; use data_types::PartitionId;
use metric::assert_counter; use metric::assert_counter;
use super::*; use super::{super::mock::MockPartitionsSource, *};
#[test] #[test]
fn test_display() { fn test_display() {
@ -85,9 +86,9 @@ mod tests {
async fn test_fetch() { async fn test_fetch() {
let registry = Registry::new(); let registry = Registry::new();
let partitions = vec![ let partitions = vec![
PartitionId::new(5), CompactionJob::new(PartitionId::new(5)),
PartitionId::new(1), CompactionJob::new(PartitionId::new(1)),
PartitionId::new(12), CompactionJob::new(PartitionId::new(12)),
]; ];
let source = MetricsPartitionsSourceWrapper::new( let source = MetricsPartitionsSourceWrapper::new(
MockPartitionsSource::new(partitions.clone()), MockPartitionsSource::new(partitions.clone()),

View File

@ -0,0 +1,65 @@
use async_trait::async_trait;
use compactor_scheduler::CompactionJob;
use parking_lot::Mutex;
use super::PartitionsSource;
/// A mock structure for providing [partitions](CompactionJob).
#[derive(Debug)]
pub struct MockPartitionsSource {
partitions: Mutex<Vec<CompactionJob>>,
}
impl MockPartitionsSource {
#[allow(dead_code)]
/// Create a new MockPartitionsSource.
pub fn new(partitions: Vec<CompactionJob>) -> Self {
Self {
partitions: Mutex::new(partitions),
}
}
/// Set CompactionJobs for MockPartitionsSource.
#[allow(dead_code)] // not used anywhere
pub fn set(&self, partitions: Vec<CompactionJob>) {
*self.partitions.lock() = partitions;
}
}
impl std::fmt::Display for MockPartitionsSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "mock")
}
}
#[async_trait]
impl PartitionsSource for MockPartitionsSource {
async fn fetch(&self) -> Vec<CompactionJob> {
self.partitions.lock().clone()
}
}
#[cfg(test)]
mod tests {
use data_types::PartitionId;
use super::*;
#[test]
fn test_display() {
assert_eq!(MockPartitionsSource::new(vec![]).to_string(), "mock",);
}
#[tokio::test]
async fn test_fetch() {
let source = MockPartitionsSource::new(vec![]);
assert_eq!(source.fetch().await, vec![],);
let p_1 = CompactionJob::new(PartitionId::new(5));
let p_2 = CompactionJob::new(PartitionId::new(1));
let p_3 = CompactionJob::new(PartitionId::new(12));
let parts = vec![p_1, p_2, p_3];
source.set(parts.clone());
assert_eq!(source.fetch().await, parts,);
}
}

View File

@ -1,8 +1,38 @@
//! Abstractions that provide functionality over a [`PartitionsSource`](compactor_scheduler::PartitionsSource) of PartitionIds. //! Abstractions that provide functionality over a [`PartitionsSource`] of PartitionIds.
//! //!
//! These abstractions are for actions taken in a compactor using the PartitionIds received from a compactor_scheduler. //! These abstractions are for actions taken in a compactor using the CompactionJobs received from a compactor_scheduler.
pub mod logging; pub mod logging;
pub mod metrics; pub mod metrics;
pub mod mock;
pub mod not_empty; pub mod not_empty;
pub mod randomize_order; pub mod randomize_order;
pub mod scheduled; pub mod scheduled;
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use async_trait::async_trait;
use compactor_scheduler::CompactionJob;
/// A source of partitions, noted by [`CompactionJob`](compactor_scheduler::CompactionJob), that may potentially need compacting.
#[async_trait]
pub trait PartitionsSource: Debug + Display + Send + Sync {
/// Get partition IDs.
///
/// This method performs retries.
///
/// This should only perform basic, efficient filtering. It MUST NOT inspect individual parquet files.
async fn fetch(&self) -> Vec<CompactionJob>;
}
#[async_trait]
impl<T> PartitionsSource for Arc<T>
where
T: PartitionsSource + ?Sized,
{
async fn fetch(&self) -> Vec<CompactionJob> {
self.as_ref().fetch().await
}
}

View File

@ -1,10 +1,11 @@
use std::{fmt::Display, sync::Arc, time::Duration}; use std::{fmt::Display, sync::Arc, time::Duration};
use async_trait::async_trait; use async_trait::async_trait;
use compactor_scheduler::PartitionsSource; use compactor_scheduler::CompactionJob;
use data_types::PartitionId;
use iox_time::TimeProvider; use iox_time::TimeProvider;
use super::PartitionsSource;
#[derive(Debug)] #[derive(Debug)]
pub struct NotEmptyPartitionsSourceWrapper<T> pub struct NotEmptyPartitionsSourceWrapper<T>
where where
@ -42,7 +43,7 @@ impl<T> PartitionsSource for NotEmptyPartitionsSourceWrapper<T>
where where
T: PartitionsSource, T: PartitionsSource,
{ {
async fn fetch(&self) -> Vec<PartitionId> { async fn fetch(&self) -> Vec<CompactionJob> {
loop { loop {
let res = self.inner.fetch().await; let res = self.inner.fetch().await;
if !res.is_empty() { if !res.is_empty() {
@ -55,11 +56,11 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use compactor_scheduler::MockPartitionsSource;
use compactor_test_utils::AssertFutureExt; use compactor_test_utils::AssertFutureExt;
use data_types::PartitionId;
use iox_time::{MockProvider, Time}; use iox_time::{MockProvider, Time};
use super::*; use super::{super::mock::MockPartitionsSource, *};
#[test] #[test]
fn test_display() { fn test_display() {
@ -90,7 +91,7 @@ mod tests {
fut.assert_pending().await; fut.assert_pending().await;
// insert data but system is still throttled // insert data but system is still throttled
let p = PartitionId::new(5); let p = CompactionJob::new(PartitionId::new(5));
let parts = vec![p]; let parts = vec![p];
inner.set(parts.clone()); inner.set(parts.clone());
fut.assert_pending().await; fut.assert_pending().await;

View File

@ -1,10 +1,11 @@
use std::fmt::Display; use std::fmt::Display;
use async_trait::async_trait; use async_trait::async_trait;
use compactor_scheduler::PartitionsSource; use compactor_scheduler::CompactionJob;
use data_types::PartitionId;
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng}; use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
use super::PartitionsSource;
#[derive(Debug)] #[derive(Debug)]
pub struct RandomizeOrderPartitionsSourcesWrapper<T> pub struct RandomizeOrderPartitionsSourcesWrapper<T>
where where
@ -37,7 +38,7 @@ impl<T> PartitionsSource for RandomizeOrderPartitionsSourcesWrapper<T>
where where
T: PartitionsSource, T: PartitionsSource,
{ {
async fn fetch(&self) -> Vec<PartitionId> { async fn fetch(&self) -> Vec<CompactionJob> {
let mut partitions = self.inner.fetch().await; let mut partitions = self.inner.fetch().await;
let mut rng = StdRng::seed_from_u64(self.seed); let mut rng = StdRng::seed_from_u64(self.seed);
partitions.shuffle(&mut rng); partitions.shuffle(&mut rng);
@ -47,9 +48,9 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use compactor_scheduler::MockPartitionsSource; use data_types::PartitionId;
use super::*; use super::{super::mock::MockPartitionsSource, *};
#[test] #[test]
fn test_display() { fn test_display() {
@ -67,21 +68,27 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_fetch_some() { async fn test_fetch_some() {
let p_1 = PartitionId::new(5); let p_1 = CompactionJob::new(PartitionId::new(5));
let p_2 = PartitionId::new(1); let p_2 = CompactionJob::new(PartitionId::new(1));
let p_3 = PartitionId::new(12); let p_3 = CompactionJob::new(PartitionId::new(12));
let partitions = vec![p_1, p_2, p_3]; let partitions = vec![p_1.clone(), p_2.clone(), p_3.clone()];
// shuffles // shuffles
let source = RandomizeOrderPartitionsSourcesWrapper::new( let source = RandomizeOrderPartitionsSourcesWrapper::new(
MockPartitionsSource::new(partitions.clone()), MockPartitionsSource::new(partitions.clone()),
123, 123,
); );
assert_eq!(source.fetch().await, vec![p_3, p_2, p_1,],); assert_eq!(
source.fetch().await,
vec![p_3.clone(), p_2.clone(), p_1.clone(),],
);
// is deterministic in same source // is deterministic in same source
for _ in 0..100 { for _ in 0..100 {
assert_eq!(source.fetch().await, vec![p_3, p_2, p_1,],); assert_eq!(
source.fetch().await,
vec![p_3.clone(), p_2.clone(), p_1.clone(),],
);
} }
// is deterministic with new source // is deterministic with new source
@ -90,7 +97,10 @@ mod tests {
MockPartitionsSource::new(partitions.clone()), MockPartitionsSource::new(partitions.clone()),
123, 123,
); );
assert_eq!(source.fetch().await, vec![p_3, p_2, p_1,],); assert_eq!(
source.fetch().await,
vec![p_3.clone(), p_2.clone(), p_1.clone(),],
);
} }
// different seed => different output // different seed => different output

View File

@ -1,8 +1,9 @@
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use compactor_scheduler::{CompactionJob, PartitionsSource, Scheduler}; use compactor_scheduler::{CompactionJob, Scheduler};
use data_types::PartitionId;
use super::PartitionsSource;
#[derive(Debug)] #[derive(Debug)]
pub struct ScheduledPartitionsSource { pub struct ScheduledPartitionsSource {
@ -17,9 +18,8 @@ impl ScheduledPartitionsSource {
#[async_trait] #[async_trait]
impl PartitionsSource for ScheduledPartitionsSource { impl PartitionsSource for ScheduledPartitionsSource {
async fn fetch(&self) -> Vec<PartitionId> { async fn fetch(&self) -> Vec<CompactionJob> {
let job: Vec<CompactionJob> = self.scheduler.get_jobs().await; self.scheduler.get_jobs().await
job.into_iter().map(|job| job.partition_id).collect()
} }
} }

View File

@ -1,6 +1,7 @@
use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use chrono::Utc; use chrono::Utc;
use compactor_scheduler::CompactionJob;
use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId}; use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId};
use futures::{stream, StreamExt, TryStreamExt}; use futures::{stream, StreamExt, TryStreamExt};
use iox_query::exec::query_tracing::send_metrics_to_tracing; use iox_query::exec::query_tracing::send_metrics_to_tracing;
@ -36,7 +37,7 @@ pub async fn compact(
components components
.partition_stream .partition_stream
.stream() .stream()
.map(|partition_id| { .map(|job| {
let components = Arc::clone(components); let components = Arc::clone(components);
// A root span is created for each partition. Later this can be linked to the // A root span is created for each partition. Later this can be linked to the
@ -48,7 +49,7 @@ pub async fn compact(
compact_partition( compact_partition(
span, span,
partition_id, job,
partition_timeout, partition_timeout,
Arc::clone(&df_semaphore), Arc::clone(&df_semaphore),
components, components,
@ -61,11 +62,12 @@ pub async fn compact(
async fn compact_partition( async fn compact_partition(
mut span: SpanRecorder, mut span: SpanRecorder,
partition_id: PartitionId, job: CompactionJob,
partition_timeout: Duration, partition_timeout: Duration,
df_semaphore: Arc<InstrumentedAsyncSemaphore>, df_semaphore: Arc<InstrumentedAsyncSemaphore>,
components: Arc<Components>, components: Arc<Components>,
) { ) {
let partition_id = job.partition_id;
info!(partition_id = partition_id.get(), timeout = ?partition_timeout, "compact partition",); info!(partition_id = partition_id.get(), timeout = ?partition_timeout, "compact partition",);
span.set_metadata("partition_id", partition_id.get().to_string()); span.set_metadata("partition_id", partition_id.get().to_string());
let scratchpad = components.scratchpad_gen.pad(); let scratchpad = components.scratchpad_gen.pad();
@ -76,7 +78,7 @@ async fn compact_partition(
async { async {
try_compact_partition( try_compact_partition(
span, span,
partition_id, job.clone(),
df_semaphore, df_semaphore,
components, components,
scratchpad, scratchpad,
@ -203,12 +205,13 @@ async fn compact_partition(
/// . Round 2 happens or not depends on the stop condition /// . Round 2 happens or not depends on the stop condition
async fn try_compact_partition( async fn try_compact_partition(
span: SpanRecorder, span: SpanRecorder,
partition_id: PartitionId, job: CompactionJob,
df_semaphore: Arc<InstrumentedAsyncSemaphore>, df_semaphore: Arc<InstrumentedAsyncSemaphore>,
components: Arc<Components>, components: Arc<Components>,
scratchpad_ctx: Arc<dyn Scratchpad>, scratchpad_ctx: Arc<dyn Scratchpad>,
transmit_progress_signal: Sender<bool>, transmit_progress_signal: Sender<bool>,
) -> Result<(), DynError> { ) -> Result<(), DynError> {
let partition_id = job.partition_id;
let mut files = components.partition_files_source.fetch(partition_id).await; let mut files = components.partition_files_source.fetch(partition_id).await;
let partition_info = components.partition_info_source.fetch(partition_id).await?; let partition_info = components.partition_info_source.fetch(partition_id).await?;
let transmit_progress_signal = Arc::new(transmit_progress_signal); let transmit_progress_signal = Arc::new(transmit_progress_signal);
@ -269,12 +272,13 @@ async fn try_compact_partition(
let df_semaphore = Arc::clone(&df_semaphore); let df_semaphore = Arc::clone(&df_semaphore);
let transmit_progress_signal = Arc::clone(&transmit_progress_signal); let transmit_progress_signal = Arc::clone(&transmit_progress_signal);
let scratchpad = Arc::clone(&scratchpad_ctx); let scratchpad = Arc::clone(&scratchpad_ctx);
let job = job.clone();
let branch_span = round_span.child("branch"); let branch_span = round_span.child("branch");
async move { async move {
execute_branch( execute_branch(
branch_span, branch_span,
partition_id, job,
branch, branch,
df_semaphore, df_semaphore,
components, components,
@ -298,7 +302,7 @@ async fn try_compact_partition(
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
async fn execute_branch( async fn execute_branch(
span: SpanRecorder, span: SpanRecorder,
partition_id: PartitionId, job: CompactionJob,
branch: Vec<ParquetFile>, branch: Vec<ParquetFile>,
df_semaphore: Arc<InstrumentedAsyncSemaphore>, df_semaphore: Arc<InstrumentedAsyncSemaphore>,
components: Arc<Components>, components: Arc<Components>,
@ -418,7 +422,7 @@ async fn execute_branch(
// files and update the upgraded files // files and update the upgraded files
let (created_files, upgraded_files) = update_catalog( let (created_files, upgraded_files) = update_catalog(
Arc::clone(&components), Arc::clone(&components),
partition_id, job.clone(),
&saved_parquet_file_state, &saved_parquet_file_state,
files_to_delete, files_to_delete,
upgrade, upgrade,
@ -627,13 +631,14 @@ async fn fetch_and_save_parquet_file_state(
/// Return created and upgraded files /// Return created and upgraded files
async fn update_catalog( async fn update_catalog(
components: Arc<Components>, components: Arc<Components>,
partition_id: PartitionId, job: CompactionJob,
saved_parquet_file_state: &SavedParquetFileState, saved_parquet_file_state: &SavedParquetFileState,
files_to_delete: Vec<ParquetFile>, files_to_delete: Vec<ParquetFile>,
files_to_upgrade: Vec<ParquetFile>, files_to_upgrade: Vec<ParquetFile>,
file_params_to_create: Vec<ParquetFileParams>, file_params_to_create: Vec<ParquetFileParams>,
target_level: CompactionLevel, target_level: CompactionLevel,
) -> Result<(Vec<ParquetFile>, Vec<ParquetFile>), DynError> { ) -> Result<(Vec<ParquetFile>, Vec<ParquetFile>), DynError> {
let partition_id = job.partition_id;
let current_parquet_file_state = let current_parquet_file_state =
fetch_and_save_parquet_file_state(&components, partition_id).await; fetch_and_save_parquet_file_state(&components, partition_id).await;

View File

@ -50,7 +50,7 @@ pub(crate) use local_scheduler::{
// partitions_source trait // partitions_source trait
mod partitions_source; mod partitions_source;
pub use partitions_source::*; pub(crate) use partitions_source::*;
// scheduler trait and associated types // scheduler trait and associated types
mod scheduler; mod scheduler;

View File

@ -9,7 +9,7 @@ use parking_lot::Mutex;
/// A source of partitions, noted by [`PartitionId`](data_types::PartitionId), that may potentially need compacting. /// A source of partitions, noted by [`PartitionId`](data_types::PartitionId), that may potentially need compacting.
#[async_trait] #[async_trait]
pub trait PartitionsSource: Debug + Display + Send + Sync { pub(crate) trait PartitionsSource: Debug + Display + Send + Sync {
/// Get partition IDs. /// Get partition IDs.
/// ///
/// This method performs retries. /// This method performs retries.
@ -28,19 +28,19 @@ where
} }
} }
pub use mock::MockPartitionsSource; pub(crate) use mock::MockPartitionsSource;
mod mock { mod mock {
use super::*; use super::*;
/// A mock structure for providing [partitions](PartitionId). /// A mock structure for providing [partitions](PartitionId).
#[derive(Debug)] #[derive(Debug)]
pub struct MockPartitionsSource { pub(crate) struct MockPartitionsSource {
partitions: Mutex<Vec<PartitionId>>, partitions: Mutex<Vec<PartitionId>>,
} }
impl MockPartitionsSource { impl MockPartitionsSource {
/// Create a new MockPartitionsSource. /// Create a new MockPartitionsSource.
pub fn new(partitions: Vec<PartitionId>) -> Self { pub(crate) fn new(partitions: Vec<PartitionId>) -> Self {
Self { Self {
partitions: Mutex::new(partitions), partitions: Mutex::new(partitions),
} }
@ -48,7 +48,7 @@ mod mock {
/// Set PartitionIds for MockPartitionsSource. /// Set PartitionIds for MockPartitionsSource.
#[allow(dead_code)] // not used anywhere #[allow(dead_code)] // not used anywhere
pub fn set(&self, partitions: Vec<PartitionId>) { pub(crate) fn set(&self, partitions: Vec<PartitionId>) {
*self.partitions.lock() = partitions; *self.partitions.lock() = partitions;
} }
} }

View File

@ -60,7 +60,7 @@ impl std::fmt::Display for SchedulerConfig {
} }
/// Job assignment for a given partition. /// Job assignment for a given partition.
#[derive(Debug, Clone)] #[derive(Debug, Clone, PartialEq)]
pub struct CompactionJob { pub struct CompactionJob {
#[allow(dead_code)] #[allow(dead_code)]
/// Unique identifier for this job. /// Unique identifier for this job.

View File

@ -16,7 +16,7 @@ parquet_file = { path = "../parquet_file" }
object_store = { workspace=true } object_store = { workspace=true }
observability_deps = { path = "../observability_deps" } observability_deps = { path = "../observability_deps" }
schema = { path = "../schema" } schema = { path = "../schema" }
serde_json = "1.0.103" serde_json = "1.0.104"
thiserror = "1.0.44" thiserror = "1.0.44"
tokio = { version = "1.29" } tokio = { version = "1.29" }
tokio-util = { version = "0.7.8" } tokio-util = { version = "0.7.8" }

View File

@ -10,7 +10,7 @@ bytes = "1.4"
futures = { version = "0.3", default-features = false } futures = { version = "0.3", default-features = false }
reqwest = { version = "0.11", default-features = false, features = ["stream", "json", "rustls-tls"] } reqwest = { version = "0.11", default-features = false, features = ["stream", "json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.103" serde_json = "1.0.104"
snafu = "0.7" snafu = "0.7"
url = "2.4.0" url = "2.4.0"
uuid = { version = "1", features = ["v4"] } uuid = { version = "1", features = ["v4"] }

View File

@ -67,7 +67,7 @@ libc = { version = "0.2" }
num_cpus = "1.16.0" num_cpus = "1.16.0"
once_cell = { version = "1.18", features = ["parking_lot"] } once_cell = { version = "1.18", features = ["parking_lot"] }
rustyline = { version = "12.0", default-features = false, features = ["with-file-history"]} rustyline = { version = "12.0", default-features = false, features = ["with-file-history"]}
serde_json = "1.0.103" serde_json = "1.0.104"
snafu = "0.7" snafu = "0.7"
tempfile = "3.7.0" tempfile = "3.7.0"
thiserror = "1.0.44" thiserror = "1.0.44"
@ -93,7 +93,7 @@ predicate = { path = "../predicate" }
predicates = "3.0.3" predicates = "3.0.3"
pretty_assertions = "1.4.0" pretty_assertions = "1.4.0"
proptest = { version = "1.2.0", default-features = false } proptest = { version = "1.2.0", default-features = false }
serde = "1.0.175" serde = "1.0.176"
test_helpers = { path = "../test_helpers", features = ["future_timeout"] } test_helpers = { path = "../test_helpers", features = ["future_timeout"] }
test_helpers_end_to_end = { path = "../test_helpers_end_to_end" } test_helpers_end_to_end = { path = "../test_helpers_end_to_end" }
insta = { version = "1", features = ["yaml"] } insta = { version = "1", features = ["yaml"] }

View File

@ -488,6 +488,7 @@ impl Config {
rpc_write_timeout_seconds: Duration::new(3, 0), rpc_write_timeout_seconds: Duration::new(3, 0),
rpc_write_replicas: 1.try_into().unwrap(), rpc_write_replicas: 1.try_into().unwrap(),
rpc_write_max_outgoing_bytes: ingester_config.rpc_write_max_incoming_bytes, rpc_write_max_outgoing_bytes: ingester_config.rpc_write_max_incoming_bytes,
rpc_write_health_error_window_seconds: Duration::from_secs(5),
}; };
// create a CompactorConfig for the all in one server based on // create a CompactorConfig for the all in one server based on

View File

@ -24,7 +24,7 @@ prost = "0.11"
rand = "0.8.3" rand = "0.8.3"
reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"] } reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"] }
schema = { path = "../schema" } schema = { path = "../schema" }
serde_json = "1.0.103" serde_json = "1.0.104"
tokio = { version = "1.29", features = ["macros", "parking_lot", "rt-multi-thread"] } tokio = { version = "1.29", features = ["macros", "parking_lot", "rt-multi-thread"] }
tokio-stream = "0.1.13" tokio-stream = "0.1.13"
thiserror = "1.0.44" thiserror = "1.0.44"

View File

@ -40,6 +40,18 @@ impl WriteOperation {
} }
} }
/// Do NOT remove this test annotation. This constructor exists to by-pass
/// safety invariant assertions for testing code only.
#[cfg(test)]
pub(crate) fn new_empty_invalid(namespace: NamespaceId, partition_key: PartitionKey) -> Self {
Self {
namespace,
tables: Default::default(),
partition_key,
span_context: None,
}
}
/// The namespace which the write is /// The namespace which the write is
pub fn namespace(&self) -> NamespaceId { pub fn namespace(&self) -> NamespaceId {
self.namespace self.namespace

View File

@ -75,12 +75,12 @@ where
"Number of WAL files that have started to be replayed", "Number of WAL files that have started to be replayed",
) )
.recorder(&[]); .recorder(&[]);
let op_count_metric = metrics let op_count_metric = metrics.register_metric::<U64Counter>(
.register_metric::<U64Counter>( "ingester_wal_replay_ops",
"ingester_wal_replay_ops", "Number of operations replayed from the WAL",
"Number of operations successfully replayed from the WAL", );
) let ok_op_count_metric = op_count_metric.recorder(&[("outcome", "success")]);
.recorder(&[]); let empty_op_count_metric = op_count_metric.recorder(&[("outcome", "skipped_empty")]);
let n_files = files.len(); let n_files = files.len();
info!(n_files, "found wal files for replay"); info!(n_files, "found wal files for replay");
@ -112,7 +112,7 @@ where
); );
// Replay this segment file // Replay this segment file
match replay_file(reader, sink, &op_count_metric).await? { match replay_file(reader, sink, &ok_op_count_metric, &empty_op_count_metric).await? {
v @ Some(_) => max_sequence = max_sequence.max(v), v @ Some(_) => max_sequence = max_sequence.max(v),
None => { None => {
// This file was empty and should be deleted. // This file was empty and should be deleted.
@ -181,7 +181,8 @@ where
async fn replay_file<T>( async fn replay_file<T>(
file: wal::ClosedSegmentFileReader, file: wal::ClosedSegmentFileReader,
sink: &T, sink: &T,
op_count_metric: &U64Counter, ok_op_count_metric: &U64Counter,
empty_op_count_metric: &U64Counter,
) -> Result<Option<SequenceNumber>, WalReplayError> ) -> Result<Option<SequenceNumber>, WalReplayError>
where where
T: DmlSink, T: DmlSink,
@ -212,6 +213,12 @@ where
let namespace_id = NamespaceId::new(op.database_id); let namespace_id = NamespaceId::new(op.database_id);
let partition_key = PartitionKey::from(op.partition_key); let partition_key = PartitionKey::from(op.partition_key);
if batches.is_empty() {
warn!(%namespace_id, "encountered wal op containing no table data, skipping replay");
empty_op_count_metric.inc(1);
continue;
}
let op = WriteOperation::new( let op = WriteOperation::new(
namespace_id, namespace_id,
batches batches
@ -255,7 +262,7 @@ where
.await .await
.map_err(Into::<DmlError>::into)?; .map_err(Into::<DmlError>::into)?;
op_count_metric.inc(1); ok_op_count_metric.inc(1);
} }
} }
@ -368,12 +375,22 @@ mod tests {
), ),
); );
// Emulate a mid-write crash by inserting an op with no data
let empty_op = WriteOperation::new_empty_invalid(
ARBITRARY_NAMESPACE_ID,
ARBITRARY_PARTITION_KEY.clone(),
);
// The write portion of this test. // The write portion of this test.
// //
// Write two ops, rotate the file, and write a third op. // Write two ops, rotate the file, and write a third op.
{ {
let inner = let inner = Arc::new(MockDmlSink::default().with_apply_return(vec![
Arc::new(MockDmlSink::default().with_apply_return(vec![Ok(()), Ok(()), Ok(())])); Ok(()),
Ok(()),
Ok(()),
Ok(()),
]));
let wal = Wal::new(dir.path()) let wal = Wal::new(dir.path())
.await .await
.expect("failed to initialise WAL"); .expect("failed to initialise WAL");
@ -405,8 +422,14 @@ mod tests {
.await .await
.expect("wal should not error"); .expect("wal should not error");
// Write the empty op
wal_sink
.apply(IngestOp::Write(empty_op))
.await
.expect("wal should not error");
// Assert the mock inner sink saw the calls // Assert the mock inner sink saw the calls
assert_eq!(inner.get_calls().len(), 3); assert_eq!(inner.get_calls().len(), 4);
} }
// Reinitialise the WAL // Reinitialise the WAL
@ -448,14 +471,15 @@ mod tests {
assert_eq!(max_sequence_number, Some(SequenceNumber::new(43))); assert_eq!(max_sequence_number, Some(SequenceNumber::new(43)));
// Assert the ops were pushed into the DmlSink exactly as generated. // Assert the ops were pushed into the DmlSink exactly as generated,
// barring the empty op which is skipped
let ops = mock_iter.sink.get_calls(); let ops = mock_iter.sink.get_calls();
assert_matches!( assert_matches!(
&*ops, &*ops,
&[ &[
IngestOp::Write(ref w1), IngestOp::Write(ref w1),
IngestOp::Write(ref w2), IngestOp::Write(ref w2),
IngestOp::Write(ref w3) IngestOp::Write(ref w3),
] => { ] => {
assert_write_ops_eq(w1.clone(), op1); assert_write_ops_eq(w1.clone(), op1);
assert_write_ops_eq(w2.clone(), op2); assert_write_ops_eq(w2.clone(), op2);
@ -493,9 +517,16 @@ mod tests {
let ops = metrics let ops = metrics
.get_instrument::<Metric<U64Counter>>("ingester_wal_replay_ops") .get_instrument::<Metric<U64Counter>>("ingester_wal_replay_ops")
.expect("file counter not found") .expect("file counter not found")
.get_observer(&Attributes::from([])) .get_observer(&Attributes::from(&[("outcome", "success")]))
.expect("attributes not found") .expect("attributes not found")
.fetch(); .fetch();
assert_eq!(ops, 3); assert_eq!(ops, 3);
let ops = metrics
.get_instrument::<Metric<U64Counter>>("ingester_wal_replay_ops")
.expect("file counter not found")
.get_observer(&Attributes::from(&[("outcome", "skipped_empty")]))
.expect("attributes not found")
.fetch();
assert_eq!(ops, 1);
} }
} }

View File

@ -23,7 +23,7 @@ rand = { version = "0.8.3", features = ["small_rng"] }
regex = "1.9" regex = "1.9"
schema = { path = "../schema" } schema = { path = "../schema" }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.103" serde_json = "1.0.104"
snafu = "0.7" snafu = "0.7"
tokio = { version = "1.29", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] } tokio = { version = "1.29", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
toml = "0.7.6" toml = "0.7.6"

View File

@ -20,7 +20,7 @@ predicate = { path = "../predicate" }
query_functions = { path = "../query_functions" } query_functions = { path = "../query_functions" }
regex = "1" regex = "1"
schema = { path = "../schema" } schema = { path = "../schema" }
serde_json = "1.0.103" serde_json = "1.0.104"
thiserror = "1.0" thiserror = "1.0"
workspace-hack = { version = "0.1", path = "../workspace-hack" } workspace-hack = { version = "0.1", path = "../workspace-hack" }

View File

@ -38,7 +38,7 @@ log = "0.4"
parking_lot = "0.12" parking_lot = "0.12"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.103" serde_json = "1.0.104"
serde_urlencoded = "0.7.0" serde_urlencoded = "0.7.0"
snafu = "0.7" snafu = "0.7"
tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }

View File

@ -240,6 +240,7 @@ pub async fn create_router_server_type(
ingester_connections, ingester_connections,
router_config.rpc_write_replicas, router_config.rpc_write_replicas,
&metrics, &metrics,
router_config.rpc_write_health_error_window_seconds,
); );
let rpc_writer = InstrumentationDecorator::new("rpc_writer", &metrics, rpc_writer); let rpc_writer = InstrumentationDecorator::new("rpc_writer", &metrics, rpc_writer);

View File

@ -130,15 +130,16 @@ impl<T> RpcWrite<T> {
endpoints: impl IntoIterator<Item = (T, N)>, endpoints: impl IntoIterator<Item = (T, N)>,
n_copies: NonZeroUsize, n_copies: NonZeroUsize,
metrics: &metric::Registry, metrics: &metric::Registry,
error_window: Duration,
) -> Self ) -> Self
where where
T: Send + Sync + Debug + 'static, T: Send + Sync + Debug + 'static,
N: Into<Arc<str>>, N: Into<Arc<str>>,
{ {
let endpoints = Balancer::new( let endpoints = Balancer::new(
endpoints endpoints.into_iter().map(|(client, name)| {
.into_iter() CircuitBreakingClient::new(client, name.into(), error_window)
.map(|(client, name)| CircuitBreakingClient::new(client, name.into())), }),
Some(metrics), Some(metrics),
); );
@ -383,6 +384,7 @@ mod tests {
const NAMESPACE_NAME: &str = "bananas"; const NAMESPACE_NAME: &str = "bananas";
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
const ARBITRARY_TEST_ERROR_WINDOW: Duration = Duration::from_secs(5);
// Start a new `NamespaceSchema` with only the given ID; the rest of the fields are arbitrary. // Start a new `NamespaceSchema` with only the given ID; the rest of the fields are arbitrary.
fn new_empty_namespace_schema() -> Arc<NamespaceSchema> { fn new_empty_namespace_schema() -> Arc<NamespaceSchema> {
@ -460,6 +462,7 @@ mod tests {
[(Arc::clone(&client), "mock client")], [(Arc::clone(&client), "mock client")],
1.try_into().unwrap(), 1.try_into().unwrap(),
&metric::Registry::default(), &metric::Registry::default(),
ARBITRARY_TEST_ERROR_WINDOW,
); );
// Drive the RPC writer // Drive the RPC writer
@ -522,6 +525,7 @@ mod tests {
], ],
1.try_into().unwrap(), 1.try_into().unwrap(),
&metric::Registry::default(), &metric::Registry::default(),
ARBITRARY_TEST_ERROR_WINDOW,
); );
// Drive the RPC writer // Drive the RPC writer
@ -590,6 +594,7 @@ mod tests {
], ],
1.try_into().unwrap(), 1.try_into().unwrap(),
&metric::Registry::default(), &metric::Registry::default(),
ARBITRARY_TEST_ERROR_WINDOW,
); );
// Drive the RPC writer // Drive the RPC writer
@ -640,7 +645,10 @@ mod tests {
circuit_1.set_healthy(false); circuit_1.set_healthy(false);
let got = make_request( let got = make_request(
[CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1)], [
CircuitBreakingClient::new(client_1, "client_1", ARBITRARY_TEST_ERROR_WINDOW)
.with_circuit_breaker(circuit_1),
],
1, 1,
) )
.await; .await;
@ -661,7 +669,10 @@ mod tests {
circuit_1.set_healthy(true); circuit_1.set_healthy(true);
let got = make_request( let got = make_request(
[CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1)], [
CircuitBreakingClient::new(client_1, "client_1", ARBITRARY_TEST_ERROR_WINDOW)
.with_circuit_breaker(circuit_1),
],
1, 1,
) )
.await; .await;
@ -682,7 +693,10 @@ mod tests {
circuit_1.set_healthy(true); circuit_1.set_healthy(true);
let got = make_request( let got = make_request(
[CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1)], [
CircuitBreakingClient::new(client_1, "client_1", ARBITRARY_TEST_ERROR_WINDOW)
.with_circuit_breaker(circuit_1),
],
1, 1,
) )
.await; .await;
@ -714,8 +728,10 @@ mod tests {
let got = make_request( let got = make_request(
[ [
CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1), CircuitBreakingClient::new(client_1, "client_1", ARBITRARY_TEST_ERROR_WINDOW)
CircuitBreakingClient::new(client_2, "client_2").with_circuit_breaker(circuit_2), .with_circuit_breaker(circuit_1),
CircuitBreakingClient::new(client_2, "client_2", ARBITRARY_TEST_ERROR_WINDOW)
.with_circuit_breaker(circuit_2),
], ],
2, // 2 copies required 2, // 2 copies required
) )
@ -738,10 +754,18 @@ mod tests {
let got = make_request( let got = make_request(
[ [
CircuitBreakingClient::new(Arc::clone(&client_1), "client_1") CircuitBreakingClient::new(
.with_circuit_breaker(circuit_1), Arc::clone(&client_1),
CircuitBreakingClient::new(Arc::clone(&client_2), "client_2") "client_1",
.with_circuit_breaker(circuit_2), ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(circuit_1),
CircuitBreakingClient::new(
Arc::clone(&client_2),
"client_2",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(circuit_2),
], ],
2, // 2 copies required 2, // 2 copies required
) )
@ -774,10 +798,18 @@ mod tests {
circuit_2.set_healthy(true); circuit_2.set_healthy(true);
let mut clients = vec![ let mut clients = vec![
CircuitBreakingClient::new(Arc::clone(&client_1), "client_1") CircuitBreakingClient::new(
.with_circuit_breaker(circuit_1), Arc::clone(&client_1),
CircuitBreakingClient::new(Arc::clone(&client_2), "client_2") "client_1",
.with_circuit_breaker(circuit_2), ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(circuit_1),
CircuitBreakingClient::new(
Arc::clone(&client_2),
"client_2",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(circuit_2),
]; ];
// The order should never affect the outcome. // The order should never affect the outcome.
@ -820,10 +852,18 @@ mod tests {
let got = make_request( let got = make_request(
[ [
CircuitBreakingClient::new(Arc::clone(&client_1), "client_1") CircuitBreakingClient::new(
.with_circuit_breaker(circuit_1), Arc::clone(&client_1),
CircuitBreakingClient::new(Arc::clone(&client_2), "client_2") "client_1",
.with_circuit_breaker(circuit_2), ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(circuit_1),
CircuitBreakingClient::new(
Arc::clone(&client_2),
"client_2",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(circuit_2),
], ],
2, // 2 copies required 2, // 2 copies required
) )
@ -876,12 +916,24 @@ mod tests {
circuit_3.set_healthy(true); circuit_3.set_healthy(true);
let mut clients = vec![ let mut clients = vec![
CircuitBreakingClient::new(Arc::clone(&client_1), "client_1") CircuitBreakingClient::new(
.with_circuit_breaker(circuit_1), Arc::clone(&client_1),
CircuitBreakingClient::new(Arc::clone(&client_2), "client_2") "client_1",
.with_circuit_breaker(circuit_2), ARBITRARY_TEST_ERROR_WINDOW,
CircuitBreakingClient::new(Arc::clone(&client_3), "client_3") )
.with_circuit_breaker(circuit_3), .with_circuit_breaker(circuit_1),
CircuitBreakingClient::new(
Arc::clone(&client_2),
"client_2",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(circuit_2),
CircuitBreakingClient::new(
Arc::clone(&client_3),
"client_3",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(circuit_3),
]; ];
// The order should never affect the outcome. // The order should never affect the outcome.
@ -983,7 +1035,7 @@ mod tests {
async move { async move {
let endpoints = upstreams.into_iter() let endpoints = upstreams.into_iter()
.map(|(circuit, client)| { .map(|(circuit, client)| {
CircuitBreakingClient::new(client, "bananas") CircuitBreakingClient::new(client, "bananas",ARBITRARY_TEST_ERROR_WINDOW)
.with_circuit_breaker(circuit) .with_circuit_breaker(circuit)
}); });

View File

@ -219,6 +219,8 @@ mod tests {
use super::*; use super::*;
const ARBITRARY_TEST_ERROR_WINDOW: Duration = Duration::from_secs(5);
/// No healthy nodes prevents a snapshot from being returned. /// No healthy nodes prevents a snapshot from being returned.
#[tokio::test] #[tokio::test]
async fn test_balancer_empty_iter() { async fn test_balancer_empty_iter() {
@ -227,16 +229,22 @@ mod tests {
let circuit_err_1 = Arc::new(MockCircuitBreaker::default()); let circuit_err_1 = Arc::new(MockCircuitBreaker::default());
circuit_err_1.set_healthy(false); circuit_err_1.set_healthy(false);
circuit_err_1.set_should_probe(false); circuit_err_1.set_should_probe(false);
let client_err_1 = let client_err_1 = CircuitBreakingClient::new(
CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") Arc::new(MockWriteClient::default()),
.with_circuit_breaker(Arc::clone(&circuit_err_1)); "bananas",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(Arc::clone(&circuit_err_1));
let circuit_err_2 = Arc::new(MockCircuitBreaker::default()); let circuit_err_2 = Arc::new(MockCircuitBreaker::default());
circuit_err_2.set_healthy(false); circuit_err_2.set_healthy(false);
circuit_err_2.set_should_probe(false); circuit_err_2.set_should_probe(false);
let client_err_2 = let client_err_2 = CircuitBreakingClient::new(
CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") Arc::new(MockWriteClient::default()),
.with_circuit_breaker(Arc::clone(&circuit_err_2)); "bananas",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(Arc::clone(&circuit_err_2));
assert_eq!(circuit_err_1.ok_count(), 0); assert_eq!(circuit_err_1.ok_count(), 0);
assert_eq!(circuit_err_2.ok_count(), 0); assert_eq!(circuit_err_2.ok_count(), 0);
@ -254,21 +262,31 @@ mod tests {
let circuit_err_1 = Arc::new(MockCircuitBreaker::default()); let circuit_err_1 = Arc::new(MockCircuitBreaker::default());
circuit_err_1.set_healthy(false); circuit_err_1.set_healthy(false);
circuit_err_1.set_should_probe(true); circuit_err_1.set_should_probe(true);
let client_err_1 = let client_err_1 = CircuitBreakingClient::new(
CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") Arc::new(MockWriteClient::default()),
.with_circuit_breaker(Arc::clone(&circuit_err_1)); "bananas",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(Arc::clone(&circuit_err_1));
let circuit_err_2 = Arc::new(MockCircuitBreaker::default()); let circuit_err_2 = Arc::new(MockCircuitBreaker::default());
circuit_err_2.set_healthy(false); circuit_err_2.set_healthy(false);
circuit_err_2.set_should_probe(true); circuit_err_2.set_should_probe(true);
let client_err_2 = let client_err_2 = CircuitBreakingClient::new(
CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") Arc::new(MockWriteClient::default()),
.with_circuit_breaker(Arc::clone(&circuit_err_2)); "bananas",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(Arc::clone(&circuit_err_2));
let circuit_ok = Arc::new(MockCircuitBreaker::default()); let circuit_ok = Arc::new(MockCircuitBreaker::default());
circuit_ok.set_healthy(true); circuit_ok.set_healthy(true);
circuit_ok.set_should_probe(false); circuit_ok.set_should_probe(false);
let client_ok = CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") let client_ok = CircuitBreakingClient::new(
.with_circuit_breaker(Arc::clone(&circuit_ok)); Arc::new(MockWriteClient::default()),
"bananas",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(Arc::clone(&circuit_ok));
let balancer = Balancer::new([client_err_1, client_err_2, client_ok], None); let balancer = Balancer::new([client_err_1, client_err_2, client_ok], None);
@ -323,21 +341,31 @@ mod tests {
let circuit_err_1 = Arc::new(MockCircuitBreaker::default()); let circuit_err_1 = Arc::new(MockCircuitBreaker::default());
circuit_err_1.set_healthy(false); circuit_err_1.set_healthy(false);
circuit_err_1.set_should_probe(false); circuit_err_1.set_should_probe(false);
let client_err_1 = let client_err_1 = CircuitBreakingClient::new(
CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") Arc::new(MockWriteClient::default()),
.with_circuit_breaker(Arc::clone(&circuit_err_1)); "bananas",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(Arc::clone(&circuit_err_1));
let circuit_err_2 = Arc::new(MockCircuitBreaker::default()); let circuit_err_2 = Arc::new(MockCircuitBreaker::default());
circuit_err_2.set_healthy(false); circuit_err_2.set_healthy(false);
circuit_err_2.set_should_probe(false); circuit_err_2.set_should_probe(false);
let client_err_2 = let client_err_2 = CircuitBreakingClient::new(
CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") Arc::new(MockWriteClient::default()),
.with_circuit_breaker(Arc::clone(&circuit_err_2)); "bananas",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(Arc::clone(&circuit_err_2));
let circuit_ok = Arc::new(MockCircuitBreaker::default()); let circuit_ok = Arc::new(MockCircuitBreaker::default());
circuit_ok.set_healthy(true); circuit_ok.set_healthy(true);
let client_ok = CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") let client_ok = CircuitBreakingClient::new(
.with_circuit_breaker(Arc::clone(&circuit_ok)); Arc::new(MockWriteClient::default()),
"bananas",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(Arc::clone(&circuit_ok));
assert_eq!(circuit_ok.ok_count(), 0); assert_eq!(circuit_ok.ok_count(), 0);
assert_eq!(circuit_err_1.ok_count(), 0); assert_eq!(circuit_err_1.ok_count(), 0);
@ -383,8 +411,12 @@ mod tests {
let circuit = Arc::new(MockCircuitBreaker::default()); let circuit = Arc::new(MockCircuitBreaker::default());
circuit.set_healthy(false); circuit.set_healthy(false);
circuit.set_should_probe(false); circuit.set_should_probe(false);
let client = CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") let client = CircuitBreakingClient::new(
.with_circuit_breaker(Arc::clone(&circuit)); Arc::new(MockWriteClient::default()),
"bananas",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(Arc::clone(&circuit));
assert_eq!(circuit.ok_count(), 0); assert_eq!(circuit.ok_count(), 0);
@ -432,21 +464,30 @@ mod tests {
// two returns a healthy state, one is unhealthy. // two returns a healthy state, one is unhealthy.
let circuit_err = Arc::new(MockCircuitBreaker::default()); let circuit_err = Arc::new(MockCircuitBreaker::default());
circuit_err.set_healthy(false); circuit_err.set_healthy(false);
let client_err = let client_err = CircuitBreakingClient::new(
CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") Arc::new(MockWriteClient::default()),
.with_circuit_breaker(Arc::clone(&circuit_err)); "bananas",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(Arc::clone(&circuit_err));
let circuit_ok_1 = Arc::new(MockCircuitBreaker::default()); let circuit_ok_1 = Arc::new(MockCircuitBreaker::default());
circuit_ok_1.set_healthy(true); circuit_ok_1.set_healthy(true);
let client_ok_1 = let client_ok_1 = CircuitBreakingClient::new(
CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") Arc::new(MockWriteClient::default()),
.with_circuit_breaker(Arc::clone(&circuit_ok_1)); "bananas",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(Arc::clone(&circuit_ok_1));
let circuit_ok_2 = Arc::new(MockCircuitBreaker::default()); let circuit_ok_2 = Arc::new(MockCircuitBreaker::default());
circuit_ok_2.set_healthy(true); circuit_ok_2.set_healthy(true);
let client_ok_2 = let client_ok_2 = CircuitBreakingClient::new(
CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") Arc::new(MockWriteClient::default()),
.with_circuit_breaker(Arc::clone(&circuit_ok_2)); "bananas",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(Arc::clone(&circuit_ok_2));
let balancer = Balancer::new([client_err, client_ok_1, client_ok_2], None); let balancer = Balancer::new([client_err, client_ok_1, client_ok_2], None);
@ -478,22 +519,31 @@ mod tests {
let circuit_err_1 = Arc::new(MockCircuitBreaker::default()); let circuit_err_1 = Arc::new(MockCircuitBreaker::default());
circuit_err_1.set_healthy(false); circuit_err_1.set_healthy(false);
circuit_err_1.set_should_probe(false); circuit_err_1.set_should_probe(false);
let client_err_1 = let client_err_1 = CircuitBreakingClient::new(
CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bad-client-1") Arc::new(MockWriteClient::default()),
.with_circuit_breaker(Arc::clone(&circuit_err_1)); "bad-client-1",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(Arc::clone(&circuit_err_1));
let circuit_err_2 = Arc::new(MockCircuitBreaker::default()); let circuit_err_2 = Arc::new(MockCircuitBreaker::default());
circuit_err_2.set_healthy(false); circuit_err_2.set_healthy(false);
circuit_err_2.set_should_probe(true); circuit_err_2.set_should_probe(true);
let client_err_2 = let client_err_2 = CircuitBreakingClient::new(
CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bad-client-2") Arc::new(MockWriteClient::default()),
.with_circuit_breaker(Arc::clone(&circuit_err_2)); "bad-client-2",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(Arc::clone(&circuit_err_2));
let circuit_ok = Arc::new(MockCircuitBreaker::default()); let circuit_ok = Arc::new(MockCircuitBreaker::default());
circuit_ok.set_healthy(true); circuit_ok.set_healthy(true);
let client_ok = let client_ok = CircuitBreakingClient::new(
CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "ok-client") Arc::new(MockWriteClient::default()),
.with_circuit_breaker(Arc::clone(&circuit_ok)); "ok-client",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(Arc::clone(&circuit_ok));
let balancer = Balancer::new([client_err_1, client_err_2, client_ok], None); let balancer = Balancer::new([client_err_1, client_err_2, client_ok], None);
@ -558,9 +608,12 @@ mod tests {
let circuit_err = Arc::new(MockCircuitBreaker::default()); let circuit_err = Arc::new(MockCircuitBreaker::default());
circuit_err.set_healthy(false); circuit_err.set_healthy(false);
circuit_err.set_should_probe(false); circuit_err.set_should_probe(false);
let client_err = let client_err = CircuitBreakingClient::new(
CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bad-client-1") Arc::new(MockWriteClient::default()),
.with_circuit_breaker(Arc::clone(&circuit_err)); "bad-client-1",
ARBITRARY_TEST_ERROR_WINDOW,
)
.with_circuit_breaker(Arc::clone(&circuit_err));
let balancer = Balancer::new([client_err], None); let balancer = Balancer::new([client_err], None);
assert!(balancer.endpoints().is_none()); assert!(balancer.endpoints().is_none());

View File

@ -16,13 +16,10 @@ use tokio::{
}; };
/// The limit on the ratio of the number of error requests to the number of /// The limit on the ratio of the number of error requests to the number of
/// successful requests within an [`ERROR_WINDOW`] interval to be considered /// successful requests within the configured error window to be considered
/// healthy. /// healthy. If updating this value, remember to update the documentation
/// in the CLI flag for the configurable error window.
const MAX_ERROR_RATIO: f32 = 0.8; const MAX_ERROR_RATIO: f32 = 0.8;
/// The (discrete) slices of time in which the error ratio must exceed
/// [`MAX_ERROR_RATIO`] to cause the [`CircuitBreaker`] to transition to the
/// unhealthy state.
const ERROR_WINDOW: Duration = Duration::from_secs(5);
/// The maximum number of probe requests to allow when in an unhealthy state. /// The maximum number of probe requests to allow when in an unhealthy state.
const NUM_PROBES: u64 = 10; const NUM_PROBES: u64 = 10;
/// The length of time during which up to [`NUM_PROBES`] are allowed when in an /// The length of time during which up to [`NUM_PROBES`] are allowed when in an
@ -64,7 +61,7 @@ const PROBE_INTERVAL: Duration = Duration::from_secs(1);
/// # Implementation /// # Implementation
/// ///
/// The circuit breaker is considered unhealthy when 80% ([`MAX_ERROR_RATIO`]) /// The circuit breaker is considered unhealthy when 80% ([`MAX_ERROR_RATIO`])
/// of requests within a 5 second window [`ERROR_WINDOW`] fail. The breaker /// of requests within the configured error window fail. The breaker
/// becomes healthy again when the error rate falls below 80% /// becomes healthy again when the error rate falls below 80%
/// ([`MAX_ERROR_RATIO`]) for the, at most, 10 probe requests ([`NUM_PROBES`]) /// ([`MAX_ERROR_RATIO`]) for the, at most, 10 probe requests ([`NUM_PROBES`])
/// allowed through within 1 second ([`PROBE_INTERVAL`]). /// allowed through within 1 second ([`PROBE_INTERVAL`]).
@ -80,7 +77,7 @@ const PROBE_INTERVAL: Duration = Duration::from_secs(1);
/// breaker is considered healthy when the ratio of the number of error /// breaker is considered healthy when the ratio of the number of error
/// requests to the number of successful requests in the current window is /// requests to the number of successful requests in the current window is
/// less than [`MAX_ERROR_RATIO`]. If the ratio of errors exceeds /// less than [`MAX_ERROR_RATIO`]. If the ratio of errors exceeds
/// [`MAX_ERROR_RATIO`] within a single [`ERROR_WINDOW`] duration of time, /// [`MAX_ERROR_RATIO`] within a single error window,
/// the circuit breaker is then considered to be in the "open/unhealthy" /// the circuit breaker is then considered to be in the "open/unhealthy"
/// state. /// state.
/// ///
@ -102,12 +99,12 @@ const PROBE_INTERVAL: Duration = Duration::from_secs(1);
/// ///
/// Successful requests and errors are recorded when passed to /// Successful requests and errors are recorded when passed to
/// [`CircuitBreaker::observe()`]. These counters are reset at intervals of /// [`CircuitBreaker::observe()`]. These counters are reset at intervals of
/// [`ERROR_WINDOW`], meaning that the ratio of errors must exceed /// the configured error window, meaning that the ratio of errors must exceed
/// [`MAX_ERROR_RATIO`] within a single window to open the circuit breaker to /// [`MAX_ERROR_RATIO`] within a single window to open the circuit breaker to
/// start being considered unhealthy. /// start being considered unhealthy.
/// ///
/// A floor of at least `MAX_ERROR_RATIO * NUM_PROBES` must be observed per /// A floor of at least [`MAX_ERROR_RATIO`] * [`NUM_PROBES`] must be observed per
/// [`ERROR_WINDOW`] before the circuit breaker opens / becomes unhealthy. /// error window before the circuit breaker opens / becomes unhealthy.
/// ///
/// Error ratios are measured on every call to [`CircuitBreaker::is_healthy`], /// Error ratios are measured on every call to [`CircuitBreaker::is_healthy`],
/// which should be done before determining whether to perform each request. /// which should be done before determining whether to perform each request.
@ -121,8 +118,8 @@ const PROBE_INTERVAL: Duration = Duration::from_secs(1);
/// ///
/// ## Probing / Closing (becoming healthy) /// ## Probing / Closing (becoming healthy)
/// ///
/// Once a circuit breaker transitions to "open/unhealthy", up to [`NUM_PROBES`] /// Once a circuit breaker transitions to "open/unhealthy", up to 10 [`NUM_PROBES`]
/// requests are allowed per [`PROBE_INTERVAL`], as determined by calling /// requests are allowed per 1s [`PROBE_INTERVAL`], as determined by calling
/// [`CircuitBreaker::should_probe`] before sending a request. This is referred /// [`CircuitBreaker::should_probe`] before sending a request. This is referred
/// to as "probing", allowing the client to discover the state of the /// to as "probing", allowing the client to discover the state of the
/// (potentially unavailable) remote while bounding the number of requests that /// (potentially unavailable) remote while bounding the number of requests that
@ -145,7 +142,7 @@ const PROBE_INTERVAL: Duration = Duration::from_secs(1);
#[derive(Debug)] #[derive(Debug)]
pub struct CircuitBreaker { pub struct CircuitBreaker {
/// Counters tracking the number of [`Ok`] and [`Err`] observed in the /// Counters tracking the number of [`Ok`] and [`Err`] observed in the
/// current [`ERROR_WINDOW`]. /// current error window.
/// ///
/// When the total number of requests ([`RequestCounterValue::total()`]) is /// When the total number of requests ([`RequestCounterValue::total()`]) is
/// less than [`NUM_PROBES`], the circuit is in the "probing" regime. When /// less than [`NUM_PROBES`], the circuit is in the "probing" regime. When
@ -157,7 +154,7 @@ pub struct CircuitBreaker {
/// should be allowed and resetting the [`PROBE_INTERVAL`]. /// should be allowed and resetting the [`PROBE_INTERVAL`].
probes: Mutex<ProbeState>, probes: Mutex<ProbeState>,
/// A task to reset the request count at intervals of [`ERROR_WINDOW`]. /// A task to reset the request count at the configured error window.
reset_task: JoinHandle<()>, reset_task: JoinHandle<()>,
/// A string description of the endpoint this [`CircuitBreaker`] models. /// A string description of the endpoint this [`CircuitBreaker`] models.
@ -178,13 +175,13 @@ struct ProbeState {
} }
impl CircuitBreaker { impl CircuitBreaker {
pub(crate) fn new(endpoint: impl Into<Arc<str>>) -> Self { pub(crate) fn new(endpoint: impl Into<Arc<str>>, error_window: Duration) -> Self {
let requests = Arc::new(RequestCounter::default()); let requests = Arc::new(RequestCounter::default());
let s = Self { let s = Self {
requests: Arc::clone(&requests), requests: Arc::clone(&requests),
probes: Mutex::new(ProbeState::default()), probes: Mutex::new(ProbeState::default()),
reset_task: tokio::spawn(async move { reset_task: tokio::spawn(async move {
let mut ticker = tokio::time::interval(ERROR_WINDOW); let mut ticker = tokio::time::interval(error_window);
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop { loop {
ticker.tick().await; ticker.tick().await;
@ -366,7 +363,7 @@ fn is_healthy(counts: RequestCounterValue) -> bool {
/// Resets the absolute request counter values if the current circuit state is /// Resets the absolute request counter values if the current circuit state is
/// "closed" (healthy, not probing) at the time of the call, such that the there /// "closed" (healthy, not probing) at the time of the call, such that the there
/// must be NUM_PROBES * MAX_ERROR_RATIO number of failed requests to open the /// must be [`NUM_PROBES`] * [`MAX_ERROR_RATIO`] number of failed requests to open the
/// circuit (mark as unhealthy). /// circuit (mark as unhealthy).
/// ///
/// Retains the closed/healthy state of the circuit. This is NOT an atomic /// Retains the closed/healthy state of the circuit. This is NOT an atomic
@ -466,7 +463,7 @@ mod tests {
/// Return a new [`CircuitBreaker`] with the reset ticker disabled. /// Return a new [`CircuitBreaker`] with the reset ticker disabled.
fn new_no_reset() -> CircuitBreaker { fn new_no_reset() -> CircuitBreaker {
let c = CircuitBreaker::new("bananas"); let c = CircuitBreaker::new("bananas", Duration::from_secs(5));
c.reset_task.abort(); c.reset_task.abort();
c c
} }
@ -599,13 +596,13 @@ mod tests {
} }
/// The circuit is marked unhealthy if the error rate exceeds /// The circuit is marked unhealthy if the error rate exceeds
/// MAX_ERROR_RATIO within a single ERROR_WINDOW (approximately). /// MAX_ERROR_RATIO within a single error window (approximately).
/// ///
/// This test ensures the counter reset logic prevents errors from different /// This test ensures the counter reset logic prevents errors from different
/// ERROR_WINDOW periods from changing the circuit to open/unhealthy. /// error window periods from changing the circuit to open/unhealthy.
#[tokio::test] #[tokio::test]
async fn test_periodic_counter_reset() { async fn test_periodic_counter_reset() {
let c = CircuitBreaker::new("bananas"); let c = CircuitBreaker::new("bananas", Duration::from_secs(5));
// Assert the circuit breaker as healthy. // Assert the circuit breaker as healthy.
assert!(c.is_healthy()); assert!(c.is_healthy());

View File

@ -1,4 +1,4 @@
use std::{fmt::Debug, sync::Arc}; use std::{fmt::Debug, sync::Arc, time::Duration};
use async_trait::async_trait; use async_trait::async_trait;
use generated_types::influxdata::iox::ingester::v1::WriteRequest; use generated_types::influxdata::iox::ingester::v1::WriteRequest;
@ -51,9 +51,13 @@ pub(super) struct CircuitBreakingClient<T, C = CircuitBreaker> {
} }
impl<T> CircuitBreakingClient<T> { impl<T> CircuitBreakingClient<T> {
pub(super) fn new(inner: T, endpoint_name: impl Into<Arc<str>>) -> Self { pub(super) fn new(
inner: T,
endpoint_name: impl Into<Arc<str>>,
error_window: Duration,
) -> Self {
let endpoint_name = endpoint_name.into(); let endpoint_name = endpoint_name.into();
let state = CircuitBreaker::new(Arc::clone(&endpoint_name)); let state = CircuitBreaker::new(Arc::clone(&endpoint_name), error_window);
state.set_healthy(); state.set_healthy();
Self { Self {
inner, inner,
@ -181,8 +185,12 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_healthy() { async fn test_healthy() {
let circuit_breaker = Arc::new(MockCircuitBreaker::default()); let circuit_breaker = Arc::new(MockCircuitBreaker::default());
let wrapper = CircuitBreakingClient::new(MockWriteClient::default(), "bananas") let wrapper = CircuitBreakingClient::new(
.with_circuit_breaker(Arc::clone(&circuit_breaker)); MockWriteClient::default(),
"bananas",
Duration::from_secs(5),
)
.with_circuit_breaker(Arc::clone(&circuit_breaker));
circuit_breaker.set_healthy(true); circuit_breaker.set_healthy(true);
assert_eq!(wrapper.is_healthy(), circuit_breaker.is_healthy()); assert_eq!(wrapper.is_healthy(), circuit_breaker.is_healthy());
@ -213,8 +221,9 @@ mod tests {
.into_iter(), .into_iter(),
)), )),
); );
let wrapper = CircuitBreakingClient::new(Arc::clone(&mock_client), "bananas") let wrapper =
.with_circuit_breaker(Arc::clone(&circuit_breaker)); CircuitBreakingClient::new(Arc::clone(&mock_client), "bananas", Duration::from_secs(5))
.with_circuit_breaker(Arc::clone(&circuit_breaker));
assert_eq!(circuit_breaker.ok_count(), 0); assert_eq!(circuit_breaker.ok_count(), 0);
assert_eq!(circuit_breaker.err_count(), 0); assert_eq!(circuit_breaker.err_count(), 0);

View File

@ -34,6 +34,7 @@ pub const TEST_RETENTION_PERIOD: Duration = Duration::from_secs(3600);
pub struct TestContextBuilder { pub struct TestContextBuilder {
namespace_autocreation: MissingNamespaceAction, namespace_autocreation: MissingNamespaceAction,
single_tenancy: bool, single_tenancy: bool,
rpc_write_error_window: Duration,
} }
impl Default for TestContextBuilder { impl Default for TestContextBuilder {
@ -41,6 +42,7 @@ impl Default for TestContextBuilder {
Self { Self {
namespace_autocreation: MissingNamespaceAction::Reject, namespace_autocreation: MissingNamespaceAction::Reject,
single_tenancy: false, single_tenancy: false,
rpc_write_error_window: Duration::from_secs(5),
} }
} }
} }
@ -79,6 +81,7 @@ impl TestContextBuilder {
self.single_tenancy, self.single_tenancy,
catalog, catalog,
metrics, metrics,
self.rpc_write_error_window,
) )
.await .await
} }
@ -94,6 +97,7 @@ pub struct TestContext {
namespace_autocreation: MissingNamespaceAction, namespace_autocreation: MissingNamespaceAction,
single_tenancy: bool, single_tenancy: bool,
rpc_write_error_window: Duration,
} }
// This mass of words is certainly a downside of chained handlers. // This mass of words is certainly a downside of chained handlers.
@ -133,12 +137,14 @@ impl TestContext {
single_tenancy: bool, single_tenancy: bool,
catalog: Arc<dyn Catalog>, catalog: Arc<dyn Catalog>,
metrics: Arc<metric::Registry>, metrics: Arc<metric::Registry>,
rpc_write_error_window: Duration,
) -> Self { ) -> Self {
let client = Arc::new(MockWriteClient::default()); let client = Arc::new(MockWriteClient::default());
let rpc_writer = RpcWrite::new( let rpc_writer = RpcWrite::new(
[(Arc::clone(&client), "mock client")], [(Arc::clone(&client), "mock client")],
1.try_into().unwrap(), 1.try_into().unwrap(),
&metrics, &metrics,
rpc_write_error_window,
); );
let ns_cache = Arc::new(ReadThroughCache::new( let ns_cache = Arc::new(ReadThroughCache::new(
@ -195,6 +201,7 @@ impl TestContext {
namespace_autocreation, namespace_autocreation,
single_tenancy, single_tenancy,
rpc_write_error_window,
} }
} }
@ -208,6 +215,7 @@ impl TestContext {
self.single_tenancy, self.single_tenancy,
catalog, catalog,
metrics, metrics,
self.rpc_write_error_window,
) )
.await .await
} }

View File

@ -26,7 +26,7 @@ bytes = "1.4"
futures = "0.3" futures = "0.3"
prost = "0.11" prost = "0.11"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.103" serde_json = "1.0.104"
snafu = "0.7" snafu = "0.7"
tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tonic = { workspace = true } tonic = { workspace = true }

View File

@ -30,7 +30,7 @@ pin-project = "1.1"
prost = "0.11" prost = "0.11"
regex = "1.9.1" regex = "1.9.1"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.103" serde_json = "1.0.104"
snafu = "0.7" snafu = "0.7"
tokio-stream = { version = "0.1", features = ["net"] } tokio-stream = { version = "0.1", features = ["net"] }
tonic = { workspace = true } tonic = { workspace = true }

View File

@ -19,7 +19,7 @@ tokio = { version = "1.29", features = ["macros", "parking_lot", "sync", "time"]
tokio-util = { version = "0.7.8" } tokio-util = { version = "0.7.8" }
trace = { path = "../trace"} trace = { path = "../trace"}
workspace-hack = { version = "0.1", path = "../workspace-hack" } workspace-hack = { version = "0.1", path = "../workspace-hack" }
sysinfo = "0.29.6" sysinfo = "0.29.7"
[dev-dependencies] [dev-dependencies]
tempfile = "3.7.0" tempfile = "3.7.0"