diff --git a/Cargo.lock b/Cargo.lock index dbff294ff2..a466054a68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -981,6 +981,7 @@ dependencies = [ "metric", "object_store", "observability_deps", + "parking_lot", "parquet_file", "rand", "schema", @@ -4184,9 +4185,9 @@ checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" [[package]] name = "pprof" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b90f8560ad8bd57b207b8293bc5226e48e89039a6e590c12a297d91b84c7e60" +checksum = "978385d59daf9269189d052ca8a84c1acfd0715c0599a5d5188d4acc078ca46a" dependencies = [ "backtrace", "cfg-if", @@ -4905,18 +4906,18 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.175" +version = "1.0.176" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d25439cd7397d044e2748a6fe2432b5e85db703d6d097bd014b3c0ad1ebff0b" +checksum = "76dc28c9523c5d70816e393136b86d48909cfb27cecaa902d338c19ed47164dc" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.175" +version = "1.0.176" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b23f7ade6f110613c0d63858ddb8b94c1041f550eab58a16b371bdf2c9c80ab4" +checksum = "a4e7b8c5dc823e3b90651ff1d3808419cd14e5ad76de04feaf37da114e7a306f" dependencies = [ "proc-macro2", "quote", @@ -4925,9 +4926,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.103" +version = "1.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d03b412469450d4404fe8499a268edd7f8b79fecb074b0d812ad64ca21f4031b" +checksum = "076066c5f1078eac5b722a31827a8832fe108bed65dfa75e233c89f8206e976c" dependencies = [ "itoa", "ryu", @@ -5690,9 +5691,9 @@ checksum = "d3543ca0810e71767052bdcdd5653f23998b192642a22c5164bfa6581e40a4a2" [[package]] name = "sysinfo" -version = "0.29.6" +version = "0.29.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7cb97a5a85a136d84e75d5c3cf89655090602efb1be0d8d5337b7e386af2908" +checksum = "165d6d8539689e3d3bc8b98ac59541e1f21c7de7c85d60dc80e43ae0ed2113db" dependencies = [ "cfg-if", "core-foundation-sys", diff --git a/clap_blocks/src/router.rs b/clap_blocks/src/router.rs index 977d37b29c..6f43eef8cd 100644 --- a/clap_blocks/src/router.rs +++ b/clap_blocks/src/router.rs @@ -125,6 +125,18 @@ pub struct RouterConfig { default_value = "1" )] pub rpc_write_replicas: NonZeroUsize, + + /// Specify the (discrete) slices of time in which the router's write + /// request failures must exceed the write client's maximum error ratio of + /// 80% for a downstream RPC write handler to be considered in the unhealthy + /// state. + #[clap( + long = "rpc-write-health-error-window-seconds", + env = "INFLUXDB_IOX_RPC_WRITE_HEALTH_ERROR_WINDOW_SECONDS", + default_value = "5", + value_parser = parse_duration + )] + pub rpc_write_health_error_window_seconds: Duration, } /// Map a string containing an integer number of seconds into a [`Duration`]. diff --git a/compactor/Cargo.toml b/compactor/Cargo.toml index 5fa067063d..a1621e0b59 100644 --- a/compactor/Cargo.toml +++ b/compactor/Cargo.toml @@ -30,6 +30,7 @@ trace = { version = "0.1.0", path = "../trace" } tracker = { path = "../tracker" } uuid = { version = "1", features = ["v4"] } workspace-hack = { version = "0.1", path = "../workspace-hack" } +parking_lot = "0.12.1" [dev-dependencies] arrow_util = { path = "../arrow_util" } diff --git a/compactor/src/components/hardcoded.rs b/compactor/src/components/hardcoded.rs index 26ee183a38..7391d97850 100644 --- a/compactor/src/components/hardcoded.rs +++ b/compactor/src/components/hardcoded.rs @@ -4,7 +4,7 @@ use std::{sync::Arc, time::Duration}; -use compactor_scheduler::{create_scheduler, PartitionsSource, Scheduler}; +use compactor_scheduler::{create_scheduler, Scheduler}; use data_types::CompactionLevel; use object_store::memory::InMemory; @@ -64,7 +64,7 @@ use super::{ logging::LoggingPartitionsSourceWrapper, metrics::MetricsPartitionsSourceWrapper, not_empty::NotEmptyPartitionsSourceWrapper, randomize_order::RandomizeOrderPartitionsSourcesWrapper, - scheduled::ScheduledPartitionsSource, + scheduled::ScheduledPartitionsSource, PartitionsSource, }, post_classification_partition_filter::{ logging::LoggingPostClassificationFilterWrapper, diff --git a/compactor/src/components/partition_stream/endless.rs b/compactor/src/components/partition_stream/endless.rs index f9e53ad559..75fdfd1407 100644 --- a/compactor/src/components/partition_stream/endless.rs +++ b/compactor/src/components/partition_stream/endless.rs @@ -1,10 +1,11 @@ use std::{collections::VecDeque, fmt::Display, sync::Arc}; -use compactor_scheduler::PartitionsSource; -use data_types::PartitionId; +use compactor_scheduler::CompactionJob; use futures::{stream::BoxStream, StreamExt}; -use super::super::partition_files_source::rate_limit::RateLimit; +use super::super::{ + partition_files_source::rate_limit::RateLimit, partitions_source::PartitionsSource, +}; use super::PartitionStream; #[derive(Debug)] @@ -41,7 +42,7 @@ impl PartitionStream for EndlessPartititionStream where T: PartitionsSource, { - fn stream(&self) -> BoxStream<'_, PartitionId> { + fn stream(&self) -> BoxStream<'_, CompactionJob> { let source = Arc::clone(&self.source); // Note: we use a VecDeque as a buffer so we can preserve the order and cheaply remove the first element without @@ -79,9 +80,9 @@ where #[cfg(test)] mod tests { - use compactor_scheduler::MockPartitionsSource; + use data_types::PartitionId; - use super::*; + use super::{super::super::partitions_source::mock::MockPartitionsSource, *}; #[test] fn test_display() { @@ -92,9 +93,9 @@ mod tests { #[tokio::test] async fn test_stream() { let ids = vec![ - PartitionId::new(1), - PartitionId::new(3), - PartitionId::new(2), + CompactionJob::new(PartitionId::new(1)), + CompactionJob::new(PartitionId::new(3)), + CompactionJob::new(PartitionId::new(2)), ]; let stream = EndlessPartititionStream::new(MockPartitionsSource::new(ids.clone())); @@ -103,13 +104,7 @@ mod tests { // we need to limit the stream at one point because it is endless assert_eq!( stream.stream().take(5).collect::>().await, - vec![ - PartitionId::new(1), - PartitionId::new(3), - PartitionId::new(2), - PartitionId::new(1), - PartitionId::new(3) - ], + [&ids[..], &ids[..2]].concat(), ); } } diff --git a/compactor/src/components/partition_stream/mod.rs b/compactor/src/components/partition_stream/mod.rs index 35fa26ffc2..690f7bf20f 100644 --- a/compactor/src/components/partition_stream/mod.rs +++ b/compactor/src/components/partition_stream/mod.rs @@ -1,6 +1,6 @@ use std::fmt::{Debug, Display}; -use data_types::PartitionId; +use compactor_scheduler::CompactionJob; use futures::stream::BoxStream; pub mod endless; @@ -8,8 +8,8 @@ pub mod once; /// Source for partitions. pub trait PartitionStream: Debug + Display + Send + Sync { - /// Create new source stream of partitions. + /// Create new source stream of compaction job. /// /// This stream may be endless. - fn stream(&self) -> BoxStream<'_, PartitionId>; + fn stream(&self) -> BoxStream<'_, CompactionJob>; } diff --git a/compactor/src/components/partition_stream/once.rs b/compactor/src/components/partition_stream/once.rs index 0a4c44d2b8..76520b95b0 100644 --- a/compactor/src/components/partition_stream/once.rs +++ b/compactor/src/components/partition_stream/once.rs @@ -1,10 +1,9 @@ use std::{fmt::Display, sync::Arc}; -use compactor_scheduler::PartitionsSource; -use data_types::PartitionId; +use compactor_scheduler::CompactionJob; use futures::{stream::BoxStream, StreamExt}; -use super::PartitionStream; +use super::{super::partitions_source::PartitionsSource, PartitionStream}; #[derive(Debug)] pub struct OncePartititionStream @@ -38,7 +37,7 @@ impl PartitionStream for OncePartititionStream where T: PartitionsSource, { - fn stream(&self) -> BoxStream<'_, PartitionId> { + fn stream(&self) -> BoxStream<'_, CompactionJob> { let source = Arc::clone(&self.source); futures::stream::once(async move { futures::stream::iter(source.fetch().await) }) .flatten() @@ -48,9 +47,9 @@ where #[cfg(test)] mod tests { - use compactor_scheduler::MockPartitionsSource; + use data_types::PartitionId; - use super::*; + use super::{super::super::partitions_source::mock::MockPartitionsSource, *}; #[test] fn test_display() { @@ -61,9 +60,9 @@ mod tests { #[tokio::test] async fn test_stream() { let ids = vec![ - PartitionId::new(1), - PartitionId::new(3), - PartitionId::new(2), + CompactionJob::new(PartitionId::new(1)), + CompactionJob::new(PartitionId::new(3)), + CompactionJob::new(PartitionId::new(2)), ]; let stream = OncePartititionStream::new(MockPartitionsSource::new(ids.clone())); diff --git a/compactor/src/components/partitions_source/logging.rs b/compactor/src/components/partitions_source/logging.rs index 435ac11bb2..541de14893 100644 --- a/compactor/src/components/partitions_source/logging.rs +++ b/compactor/src/components/partitions_source/logging.rs @@ -1,10 +1,11 @@ use std::fmt::Display; use async_trait::async_trait; -use compactor_scheduler::PartitionsSource; -use data_types::PartitionId; +use compactor_scheduler::CompactionJob; use observability_deps::tracing::{info, warn}; +use super::PartitionsSource; + #[derive(Debug)] pub struct LoggingPartitionsSourceWrapper where @@ -36,7 +37,7 @@ impl PartitionsSource for LoggingPartitionsSourceWrapper where T: PartitionsSource, { - async fn fetch(&self) -> Vec { + async fn fetch(&self) -> Vec { let partitions = self.inner.fetch().await; info!(n_partitions = partitions.len(), "Fetch partitions",); if partitions.is_empty() { @@ -48,10 +49,10 @@ where #[cfg(test)] mod tests { - use compactor_scheduler::MockPartitionsSource; + use data_types::PartitionId; use test_helpers::tracing::TracingCapture; - use super::*; + use super::{super::mock::MockPartitionsSource, *}; #[test] fn test_display() { @@ -74,9 +75,9 @@ mod tests { #[tokio::test] async fn test_fetch_some() { - let p_1 = PartitionId::new(5); - let p_2 = PartitionId::new(1); - let p_3 = PartitionId::new(12); + let p_1 = CompactionJob::new(PartitionId::new(5)); + let p_2 = CompactionJob::new(PartitionId::new(1)); + let p_3 = CompactionJob::new(PartitionId::new(12)); let partitions = vec![p_1, p_2, p_3]; let source = diff --git a/compactor/src/components/partitions_source/metrics.rs b/compactor/src/components/partitions_source/metrics.rs index bdebc59541..2520dd82eb 100644 --- a/compactor/src/components/partitions_source/metrics.rs +++ b/compactor/src/components/partitions_source/metrics.rs @@ -1,10 +1,11 @@ use std::fmt::Display; use async_trait::async_trait; -use compactor_scheduler::PartitionsSource; -use data_types::PartitionId; +use compactor_scheduler::CompactionJob; use metric::{Registry, U64Counter}; +use super::PartitionsSource; + const METRIC_NAME_PARTITIONS_FETCH_COUNT: &str = "iox_compactor_partitions_fetch_count"; const METRIC_NAME_PARTITIONS_COUNT: &str = "iox_compactor_partitions_count"; @@ -58,7 +59,7 @@ impl PartitionsSource for MetricsPartitionsSourceWrapper where T: PartitionsSource, { - async fn fetch(&self) -> Vec { + async fn fetch(&self) -> Vec { let partitions = self.inner.fetch().await; self.partitions_fetch_counter.inc(1); self.partitions_counter.inc(partitions.len() as u64); @@ -68,10 +69,10 @@ where #[cfg(test)] mod tests { - use compactor_scheduler::MockPartitionsSource; + use data_types::PartitionId; use metric::assert_counter; - use super::*; + use super::{super::mock::MockPartitionsSource, *}; #[test] fn test_display() { @@ -85,9 +86,9 @@ mod tests { async fn test_fetch() { let registry = Registry::new(); let partitions = vec![ - PartitionId::new(5), - PartitionId::new(1), - PartitionId::new(12), + CompactionJob::new(PartitionId::new(5)), + CompactionJob::new(PartitionId::new(1)), + CompactionJob::new(PartitionId::new(12)), ]; let source = MetricsPartitionsSourceWrapper::new( MockPartitionsSource::new(partitions.clone()), diff --git a/compactor/src/components/partitions_source/mock.rs b/compactor/src/components/partitions_source/mock.rs new file mode 100644 index 0000000000..d72d7fb11c --- /dev/null +++ b/compactor/src/components/partitions_source/mock.rs @@ -0,0 +1,65 @@ +use async_trait::async_trait; +use compactor_scheduler::CompactionJob; +use parking_lot::Mutex; + +use super::PartitionsSource; + +/// A mock structure for providing [partitions](CompactionJob). +#[derive(Debug)] +pub struct MockPartitionsSource { + partitions: Mutex>, +} + +impl MockPartitionsSource { + #[allow(dead_code)] + /// Create a new MockPartitionsSource. + pub fn new(partitions: Vec) -> Self { + Self { + partitions: Mutex::new(partitions), + } + } + + /// Set CompactionJobs for MockPartitionsSource. + #[allow(dead_code)] // not used anywhere + pub fn set(&self, partitions: Vec) { + *self.partitions.lock() = partitions; + } +} + +impl std::fmt::Display for MockPartitionsSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "mock") + } +} + +#[async_trait] +impl PartitionsSource for MockPartitionsSource { + async fn fetch(&self) -> Vec { + self.partitions.lock().clone() + } +} + +#[cfg(test)] +mod tests { + use data_types::PartitionId; + + use super::*; + + #[test] + fn test_display() { + assert_eq!(MockPartitionsSource::new(vec![]).to_string(), "mock",); + } + + #[tokio::test] + async fn test_fetch() { + let source = MockPartitionsSource::new(vec![]); + assert_eq!(source.fetch().await, vec![],); + + let p_1 = CompactionJob::new(PartitionId::new(5)); + let p_2 = CompactionJob::new(PartitionId::new(1)); + let p_3 = CompactionJob::new(PartitionId::new(12)); + let parts = vec![p_1, p_2, p_3]; + source.set(parts.clone()); + assert_eq!(source.fetch().await, parts,); + } +} diff --git a/compactor/src/components/partitions_source/mod.rs b/compactor/src/components/partitions_source/mod.rs index eb36a073a8..6300db9d18 100644 --- a/compactor/src/components/partitions_source/mod.rs +++ b/compactor/src/components/partitions_source/mod.rs @@ -1,8 +1,38 @@ -//! Abstractions that provide functionality over a [`PartitionsSource`](compactor_scheduler::PartitionsSource) of PartitionIds. +//! Abstractions that provide functionality over a [`PartitionsSource`] of PartitionIds. //! -//! These abstractions are for actions taken in a compactor using the PartitionIds received from a compactor_scheduler. +//! These abstractions are for actions taken in a compactor using the CompactionJobs received from a compactor_scheduler. pub mod logging; pub mod metrics; +pub mod mock; pub mod not_empty; pub mod randomize_order; pub mod scheduled; + +use std::{ + fmt::{Debug, Display}, + sync::Arc, +}; + +use async_trait::async_trait; +use compactor_scheduler::CompactionJob; + +/// A source of partitions, noted by [`CompactionJob`](compactor_scheduler::CompactionJob), that may potentially need compacting. +#[async_trait] +pub trait PartitionsSource: Debug + Display + Send + Sync { + /// Get partition IDs. + /// + /// This method performs retries. + /// + /// This should only perform basic, efficient filtering. It MUST NOT inspect individual parquet files. + async fn fetch(&self) -> Vec; +} + +#[async_trait] +impl PartitionsSource for Arc +where + T: PartitionsSource + ?Sized, +{ + async fn fetch(&self) -> Vec { + self.as_ref().fetch().await + } +} diff --git a/compactor/src/components/partitions_source/not_empty.rs b/compactor/src/components/partitions_source/not_empty.rs index ff18dfc911..aba9acdf0e 100644 --- a/compactor/src/components/partitions_source/not_empty.rs +++ b/compactor/src/components/partitions_source/not_empty.rs @@ -1,10 +1,11 @@ use std::{fmt::Display, sync::Arc, time::Duration}; use async_trait::async_trait; -use compactor_scheduler::PartitionsSource; -use data_types::PartitionId; +use compactor_scheduler::CompactionJob; use iox_time::TimeProvider; +use super::PartitionsSource; + #[derive(Debug)] pub struct NotEmptyPartitionsSourceWrapper where @@ -42,7 +43,7 @@ impl PartitionsSource for NotEmptyPartitionsSourceWrapper where T: PartitionsSource, { - async fn fetch(&self) -> Vec { + async fn fetch(&self) -> Vec { loop { let res = self.inner.fetch().await; if !res.is_empty() { @@ -55,11 +56,11 @@ where #[cfg(test)] mod tests { - use compactor_scheduler::MockPartitionsSource; use compactor_test_utils::AssertFutureExt; + use data_types::PartitionId; use iox_time::{MockProvider, Time}; - use super::*; + use super::{super::mock::MockPartitionsSource, *}; #[test] fn test_display() { @@ -90,7 +91,7 @@ mod tests { fut.assert_pending().await; // insert data but system is still throttled - let p = PartitionId::new(5); + let p = CompactionJob::new(PartitionId::new(5)); let parts = vec![p]; inner.set(parts.clone()); fut.assert_pending().await; diff --git a/compactor/src/components/partitions_source/randomize_order.rs b/compactor/src/components/partitions_source/randomize_order.rs index de4ba35659..1f5419946d 100644 --- a/compactor/src/components/partitions_source/randomize_order.rs +++ b/compactor/src/components/partitions_source/randomize_order.rs @@ -1,10 +1,11 @@ use std::fmt::Display; use async_trait::async_trait; -use compactor_scheduler::PartitionsSource; -use data_types::PartitionId; +use compactor_scheduler::CompactionJob; use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng}; +use super::PartitionsSource; + #[derive(Debug)] pub struct RandomizeOrderPartitionsSourcesWrapper where @@ -37,7 +38,7 @@ impl PartitionsSource for RandomizeOrderPartitionsSourcesWrapper where T: PartitionsSource, { - async fn fetch(&self) -> Vec { + async fn fetch(&self) -> Vec { let mut partitions = self.inner.fetch().await; let mut rng = StdRng::seed_from_u64(self.seed); partitions.shuffle(&mut rng); @@ -47,9 +48,9 @@ where #[cfg(test)] mod tests { - use compactor_scheduler::MockPartitionsSource; + use data_types::PartitionId; - use super::*; + use super::{super::mock::MockPartitionsSource, *}; #[test] fn test_display() { @@ -67,21 +68,27 @@ mod tests { #[tokio::test] async fn test_fetch_some() { - let p_1 = PartitionId::new(5); - let p_2 = PartitionId::new(1); - let p_3 = PartitionId::new(12); - let partitions = vec![p_1, p_2, p_3]; + let p_1 = CompactionJob::new(PartitionId::new(5)); + let p_2 = CompactionJob::new(PartitionId::new(1)); + let p_3 = CompactionJob::new(PartitionId::new(12)); + let partitions = vec![p_1.clone(), p_2.clone(), p_3.clone()]; // shuffles let source = RandomizeOrderPartitionsSourcesWrapper::new( MockPartitionsSource::new(partitions.clone()), 123, ); - assert_eq!(source.fetch().await, vec![p_3, p_2, p_1,],); + assert_eq!( + source.fetch().await, + vec![p_3.clone(), p_2.clone(), p_1.clone(),], + ); // is deterministic in same source for _ in 0..100 { - assert_eq!(source.fetch().await, vec![p_3, p_2, p_1,],); + assert_eq!( + source.fetch().await, + vec![p_3.clone(), p_2.clone(), p_1.clone(),], + ); } // is deterministic with new source @@ -90,7 +97,10 @@ mod tests { MockPartitionsSource::new(partitions.clone()), 123, ); - assert_eq!(source.fetch().await, vec![p_3, p_2, p_1,],); + assert_eq!( + source.fetch().await, + vec![p_3.clone(), p_2.clone(), p_1.clone(),], + ); } // different seed => different output diff --git a/compactor/src/components/partitions_source/scheduled.rs b/compactor/src/components/partitions_source/scheduled.rs index 53fe408aaf..1a338b651e 100644 --- a/compactor/src/components/partitions_source/scheduled.rs +++ b/compactor/src/components/partitions_source/scheduled.rs @@ -1,8 +1,9 @@ use std::sync::Arc; use async_trait::async_trait; -use compactor_scheduler::{CompactionJob, PartitionsSource, Scheduler}; -use data_types::PartitionId; +use compactor_scheduler::{CompactionJob, Scheduler}; + +use super::PartitionsSource; #[derive(Debug)] pub struct ScheduledPartitionsSource { @@ -17,9 +18,8 @@ impl ScheduledPartitionsSource { #[async_trait] impl PartitionsSource for ScheduledPartitionsSource { - async fn fetch(&self) -> Vec { - let job: Vec = self.scheduler.get_jobs().await; - job.into_iter().map(|job| job.partition_id).collect() + async fn fetch(&self) -> Vec { + self.scheduler.get_jobs().await } } diff --git a/compactor/src/driver.rs b/compactor/src/driver.rs index 4bb73f4f0a..89f7669bd6 100644 --- a/compactor/src/driver.rs +++ b/compactor/src/driver.rs @@ -1,6 +1,7 @@ use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use chrono::Utc; +use compactor_scheduler::CompactionJob; use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId}; use futures::{stream, StreamExt, TryStreamExt}; use iox_query::exec::query_tracing::send_metrics_to_tracing; @@ -36,7 +37,7 @@ pub async fn compact( components .partition_stream .stream() - .map(|partition_id| { + .map(|job| { let components = Arc::clone(components); // A root span is created for each partition. Later this can be linked to the @@ -48,7 +49,7 @@ pub async fn compact( compact_partition( span, - partition_id, + job, partition_timeout, Arc::clone(&df_semaphore), components, @@ -61,11 +62,12 @@ pub async fn compact( async fn compact_partition( mut span: SpanRecorder, - partition_id: PartitionId, + job: CompactionJob, partition_timeout: Duration, df_semaphore: Arc, components: Arc, ) { + let partition_id = job.partition_id; info!(partition_id = partition_id.get(), timeout = ?partition_timeout, "compact partition",); span.set_metadata("partition_id", partition_id.get().to_string()); let scratchpad = components.scratchpad_gen.pad(); @@ -76,7 +78,7 @@ async fn compact_partition( async { try_compact_partition( span, - partition_id, + job.clone(), df_semaphore, components, scratchpad, @@ -203,12 +205,13 @@ async fn compact_partition( /// . Round 2 happens or not depends on the stop condition async fn try_compact_partition( span: SpanRecorder, - partition_id: PartitionId, + job: CompactionJob, df_semaphore: Arc, components: Arc, scratchpad_ctx: Arc, transmit_progress_signal: Sender, ) -> Result<(), DynError> { + let partition_id = job.partition_id; let mut files = components.partition_files_source.fetch(partition_id).await; let partition_info = components.partition_info_source.fetch(partition_id).await?; let transmit_progress_signal = Arc::new(transmit_progress_signal); @@ -269,12 +272,13 @@ async fn try_compact_partition( let df_semaphore = Arc::clone(&df_semaphore); let transmit_progress_signal = Arc::clone(&transmit_progress_signal); let scratchpad = Arc::clone(&scratchpad_ctx); + let job = job.clone(); let branch_span = round_span.child("branch"); async move { execute_branch( branch_span, - partition_id, + job, branch, df_semaphore, components, @@ -298,7 +302,7 @@ async fn try_compact_partition( #[allow(clippy::too_many_arguments)] async fn execute_branch( span: SpanRecorder, - partition_id: PartitionId, + job: CompactionJob, branch: Vec, df_semaphore: Arc, components: Arc, @@ -418,7 +422,7 @@ async fn execute_branch( // files and update the upgraded files let (created_files, upgraded_files) = update_catalog( Arc::clone(&components), - partition_id, + job.clone(), &saved_parquet_file_state, files_to_delete, upgrade, @@ -627,13 +631,14 @@ async fn fetch_and_save_parquet_file_state( /// Return created and upgraded files async fn update_catalog( components: Arc, - partition_id: PartitionId, + job: CompactionJob, saved_parquet_file_state: &SavedParquetFileState, files_to_delete: Vec, files_to_upgrade: Vec, file_params_to_create: Vec, target_level: CompactionLevel, ) -> Result<(Vec, Vec), DynError> { + let partition_id = job.partition_id; let current_parquet_file_state = fetch_and_save_parquet_file_state(&components, partition_id).await; diff --git a/compactor_scheduler/src/lib.rs b/compactor_scheduler/src/lib.rs index d482e4085c..332212353e 100644 --- a/compactor_scheduler/src/lib.rs +++ b/compactor_scheduler/src/lib.rs @@ -50,7 +50,7 @@ pub(crate) use local_scheduler::{ // partitions_source trait mod partitions_source; -pub use partitions_source::*; +pub(crate) use partitions_source::*; // scheduler trait and associated types mod scheduler; diff --git a/compactor_scheduler/src/partitions_source.rs b/compactor_scheduler/src/partitions_source.rs index a4c4e0374c..8d485d2047 100644 --- a/compactor_scheduler/src/partitions_source.rs +++ b/compactor_scheduler/src/partitions_source.rs @@ -9,7 +9,7 @@ use parking_lot::Mutex; /// A source of partitions, noted by [`PartitionId`](data_types::PartitionId), that may potentially need compacting. #[async_trait] -pub trait PartitionsSource: Debug + Display + Send + Sync { +pub(crate) trait PartitionsSource: Debug + Display + Send + Sync { /// Get partition IDs. /// /// This method performs retries. @@ -28,19 +28,19 @@ where } } -pub use mock::MockPartitionsSource; +pub(crate) use mock::MockPartitionsSource; mod mock { use super::*; /// A mock structure for providing [partitions](PartitionId). #[derive(Debug)] - pub struct MockPartitionsSource { + pub(crate) struct MockPartitionsSource { partitions: Mutex>, } impl MockPartitionsSource { /// Create a new MockPartitionsSource. - pub fn new(partitions: Vec) -> Self { + pub(crate) fn new(partitions: Vec) -> Self { Self { partitions: Mutex::new(partitions), } @@ -48,7 +48,7 @@ mod mock { /// Set PartitionIds for MockPartitionsSource. #[allow(dead_code)] // not used anywhere - pub fn set(&self, partitions: Vec) { + pub(crate) fn set(&self, partitions: Vec) { *self.partitions.lock() = partitions; } } diff --git a/compactor_scheduler/src/scheduler.rs b/compactor_scheduler/src/scheduler.rs index 5e035c6d99..0404f3ff81 100644 --- a/compactor_scheduler/src/scheduler.rs +++ b/compactor_scheduler/src/scheduler.rs @@ -60,7 +60,7 @@ impl std::fmt::Display for SchedulerConfig { } /// Job assignment for a given partition. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct CompactionJob { #[allow(dead_code)] /// Unique identifier for this job. diff --git a/import_export/Cargo.toml b/import_export/Cargo.toml index b9096ae7ab..4cbba633eb 100644 --- a/import_export/Cargo.toml +++ b/import_export/Cargo.toml @@ -16,7 +16,7 @@ parquet_file = { path = "../parquet_file" } object_store = { workspace=true } observability_deps = { path = "../observability_deps" } schema = { path = "../schema" } -serde_json = "1.0.103" +serde_json = "1.0.104" thiserror = "1.0.44" tokio = { version = "1.29" } tokio-util = { version = "0.7.8" } diff --git a/influxdb2_client/Cargo.toml b/influxdb2_client/Cargo.toml index 911cbbccd7..29d18e1111 100644 --- a/influxdb2_client/Cargo.toml +++ b/influxdb2_client/Cargo.toml @@ -10,7 +10,7 @@ bytes = "1.4" futures = { version = "0.3", default-features = false } reqwest = { version = "0.11", default-features = false, features = ["stream", "json", "rustls-tls"] } serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0.103" +serde_json = "1.0.104" snafu = "0.7" url = "2.4.0" uuid = { version = "1", features = ["v4"] } diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index dc4e0dc890..6de2baea20 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -67,7 +67,7 @@ libc = { version = "0.2" } num_cpus = "1.16.0" once_cell = { version = "1.18", features = ["parking_lot"] } rustyline = { version = "12.0", default-features = false, features = ["with-file-history"]} -serde_json = "1.0.103" +serde_json = "1.0.104" snafu = "0.7" tempfile = "3.7.0" thiserror = "1.0.44" @@ -93,7 +93,7 @@ predicate = { path = "../predicate" } predicates = "3.0.3" pretty_assertions = "1.4.0" proptest = { version = "1.2.0", default-features = false } -serde = "1.0.175" +serde = "1.0.176" test_helpers = { path = "../test_helpers", features = ["future_timeout"] } test_helpers_end_to_end = { path = "../test_helpers_end_to_end" } insta = { version = "1", features = ["yaml"] } diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index abb7fc39b6..cb69d8f2a9 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -488,6 +488,7 @@ impl Config { rpc_write_timeout_seconds: Duration::new(3, 0), rpc_write_replicas: 1.try_into().unwrap(), rpc_write_max_outgoing_bytes: ingester_config.rpc_write_max_incoming_bytes, + rpc_write_health_error_window_seconds: Duration::from_secs(5), }; // create a CompactorConfig for the all in one server based on diff --git a/influxdb_iox_client/Cargo.toml b/influxdb_iox_client/Cargo.toml index 8b3e2b028b..741ef21523 100644 --- a/influxdb_iox_client/Cargo.toml +++ b/influxdb_iox_client/Cargo.toml @@ -24,7 +24,7 @@ prost = "0.11" rand = "0.8.3" reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"] } schema = { path = "../schema" } -serde_json = "1.0.103" +serde_json = "1.0.104" tokio = { version = "1.29", features = ["macros", "parking_lot", "rt-multi-thread"] } tokio-stream = "0.1.13" thiserror = "1.0.44" diff --git a/ingester/src/dml_payload/write/op.rs b/ingester/src/dml_payload/write/op.rs index 2aba8ab578..5e1ac7a2fd 100644 --- a/ingester/src/dml_payload/write/op.rs +++ b/ingester/src/dml_payload/write/op.rs @@ -40,6 +40,18 @@ impl WriteOperation { } } + /// Do NOT remove this test annotation. This constructor exists to by-pass + /// safety invariant assertions for testing code only. + #[cfg(test)] + pub(crate) fn new_empty_invalid(namespace: NamespaceId, partition_key: PartitionKey) -> Self { + Self { + namespace, + tables: Default::default(), + partition_key, + span_context: None, + } + } + /// The namespace which the write is pub fn namespace(&self) -> NamespaceId { self.namespace diff --git a/ingester/src/init/wal_replay.rs b/ingester/src/init/wal_replay.rs index 4bb0c57a38..9d52cbcdad 100644 --- a/ingester/src/init/wal_replay.rs +++ b/ingester/src/init/wal_replay.rs @@ -75,12 +75,12 @@ where "Number of WAL files that have started to be replayed", ) .recorder(&[]); - let op_count_metric = metrics - .register_metric::( - "ingester_wal_replay_ops", - "Number of operations successfully replayed from the WAL", - ) - .recorder(&[]); + let op_count_metric = metrics.register_metric::( + "ingester_wal_replay_ops", + "Number of operations replayed from the WAL", + ); + let ok_op_count_metric = op_count_metric.recorder(&[("outcome", "success")]); + let empty_op_count_metric = op_count_metric.recorder(&[("outcome", "skipped_empty")]); let n_files = files.len(); info!(n_files, "found wal files for replay"); @@ -112,7 +112,7 @@ where ); // Replay this segment file - match replay_file(reader, sink, &op_count_metric).await? { + match replay_file(reader, sink, &ok_op_count_metric, &empty_op_count_metric).await? { v @ Some(_) => max_sequence = max_sequence.max(v), None => { // This file was empty and should be deleted. @@ -181,7 +181,8 @@ where async fn replay_file( file: wal::ClosedSegmentFileReader, sink: &T, - op_count_metric: &U64Counter, + ok_op_count_metric: &U64Counter, + empty_op_count_metric: &U64Counter, ) -> Result, WalReplayError> where T: DmlSink, @@ -212,6 +213,12 @@ where let namespace_id = NamespaceId::new(op.database_id); let partition_key = PartitionKey::from(op.partition_key); + if batches.is_empty() { + warn!(%namespace_id, "encountered wal op containing no table data, skipping replay"); + empty_op_count_metric.inc(1); + continue; + } + let op = WriteOperation::new( namespace_id, batches @@ -255,7 +262,7 @@ where .await .map_err(Into::::into)?; - op_count_metric.inc(1); + ok_op_count_metric.inc(1); } } @@ -368,12 +375,22 @@ mod tests { ), ); + // Emulate a mid-write crash by inserting an op with no data + let empty_op = WriteOperation::new_empty_invalid( + ARBITRARY_NAMESPACE_ID, + ARBITRARY_PARTITION_KEY.clone(), + ); + // The write portion of this test. // // Write two ops, rotate the file, and write a third op. { - let inner = - Arc::new(MockDmlSink::default().with_apply_return(vec![Ok(()), Ok(()), Ok(())])); + let inner = Arc::new(MockDmlSink::default().with_apply_return(vec![ + Ok(()), + Ok(()), + Ok(()), + Ok(()), + ])); let wal = Wal::new(dir.path()) .await .expect("failed to initialise WAL"); @@ -405,8 +422,14 @@ mod tests { .await .expect("wal should not error"); + // Write the empty op + wal_sink + .apply(IngestOp::Write(empty_op)) + .await + .expect("wal should not error"); + // Assert the mock inner sink saw the calls - assert_eq!(inner.get_calls().len(), 3); + assert_eq!(inner.get_calls().len(), 4); } // Reinitialise the WAL @@ -448,14 +471,15 @@ mod tests { assert_eq!(max_sequence_number, Some(SequenceNumber::new(43))); - // Assert the ops were pushed into the DmlSink exactly as generated. + // Assert the ops were pushed into the DmlSink exactly as generated, + // barring the empty op which is skipped let ops = mock_iter.sink.get_calls(); assert_matches!( &*ops, &[ IngestOp::Write(ref w1), IngestOp::Write(ref w2), - IngestOp::Write(ref w3) + IngestOp::Write(ref w3), ] => { assert_write_ops_eq(w1.clone(), op1); assert_write_ops_eq(w2.clone(), op2); @@ -493,9 +517,16 @@ mod tests { let ops = metrics .get_instrument::>("ingester_wal_replay_ops") .expect("file counter not found") - .get_observer(&Attributes::from([])) + .get_observer(&Attributes::from(&[("outcome", "success")])) .expect("attributes not found") .fetch(); assert_eq!(ops, 3); + let ops = metrics + .get_instrument::>("ingester_wal_replay_ops") + .expect("file counter not found") + .get_observer(&Attributes::from(&[("outcome", "skipped_empty")])) + .expect("attributes not found") + .fetch(); + assert_eq!(ops, 1); } } diff --git a/iox_data_generator/Cargo.toml b/iox_data_generator/Cargo.toml index 17fb9f17aa..5474542896 100644 --- a/iox_data_generator/Cargo.toml +++ b/iox_data_generator/Cargo.toml @@ -23,7 +23,7 @@ rand = { version = "0.8.3", features = ["small_rng"] } regex = "1.9" schema = { path = "../schema" } serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0.103" +serde_json = "1.0.104" snafu = "0.7" tokio = { version = "1.29", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] } toml = "0.7.6" diff --git a/iox_query_influxql/Cargo.toml b/iox_query_influxql/Cargo.toml index 9debd3bf2c..53ce018eda 100644 --- a/iox_query_influxql/Cargo.toml +++ b/iox_query_influxql/Cargo.toml @@ -20,7 +20,7 @@ predicate = { path = "../predicate" } query_functions = { path = "../query_functions" } regex = "1" schema = { path = "../schema" } -serde_json = "1.0.103" +serde_json = "1.0.104" thiserror = "1.0" workspace-hack = { version = "0.1", path = "../workspace-hack" } diff --git a/ioxd_common/Cargo.toml b/ioxd_common/Cargo.toml index cc9edc9d68..facd97faf3 100644 --- a/ioxd_common/Cargo.toml +++ b/ioxd_common/Cargo.toml @@ -38,7 +38,7 @@ log = "0.4" parking_lot = "0.12" reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0.103" +serde_json = "1.0.104" serde_urlencoded = "0.7.0" snafu = "0.7" tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index 05426c4745..60bbef82fd 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -240,6 +240,7 @@ pub async fn create_router_server_type( ingester_connections, router_config.rpc_write_replicas, &metrics, + router_config.rpc_write_health_error_window_seconds, ); let rpc_writer = InstrumentationDecorator::new("rpc_writer", &metrics, rpc_writer); diff --git a/router/src/dml_handlers/rpc_write.rs b/router/src/dml_handlers/rpc_write.rs index c3fba4826d..528ad2eb92 100644 --- a/router/src/dml_handlers/rpc_write.rs +++ b/router/src/dml_handlers/rpc_write.rs @@ -130,15 +130,16 @@ impl RpcWrite { endpoints: impl IntoIterator, n_copies: NonZeroUsize, metrics: &metric::Registry, + error_window: Duration, ) -> Self where T: Send + Sync + Debug + 'static, N: Into>, { let endpoints = Balancer::new( - endpoints - .into_iter() - .map(|(client, name)| CircuitBreakingClient::new(client, name.into())), + endpoints.into_iter().map(|(client, name)| { + CircuitBreakingClient::new(client, name.into(), error_window) + }), Some(metrics), ); @@ -383,6 +384,7 @@ mod tests { const NAMESPACE_NAME: &str = "bananas"; const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); + const ARBITRARY_TEST_ERROR_WINDOW: Duration = Duration::from_secs(5); // Start a new `NamespaceSchema` with only the given ID; the rest of the fields are arbitrary. fn new_empty_namespace_schema() -> Arc { @@ -460,6 +462,7 @@ mod tests { [(Arc::clone(&client), "mock client")], 1.try_into().unwrap(), &metric::Registry::default(), + ARBITRARY_TEST_ERROR_WINDOW, ); // Drive the RPC writer @@ -522,6 +525,7 @@ mod tests { ], 1.try_into().unwrap(), &metric::Registry::default(), + ARBITRARY_TEST_ERROR_WINDOW, ); // Drive the RPC writer @@ -590,6 +594,7 @@ mod tests { ], 1.try_into().unwrap(), &metric::Registry::default(), + ARBITRARY_TEST_ERROR_WINDOW, ); // Drive the RPC writer @@ -640,7 +645,10 @@ mod tests { circuit_1.set_healthy(false); let got = make_request( - [CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1)], + [ + CircuitBreakingClient::new(client_1, "client_1", ARBITRARY_TEST_ERROR_WINDOW) + .with_circuit_breaker(circuit_1), + ], 1, ) .await; @@ -661,7 +669,10 @@ mod tests { circuit_1.set_healthy(true); let got = make_request( - [CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1)], + [ + CircuitBreakingClient::new(client_1, "client_1", ARBITRARY_TEST_ERROR_WINDOW) + .with_circuit_breaker(circuit_1), + ], 1, ) .await; @@ -682,7 +693,10 @@ mod tests { circuit_1.set_healthy(true); let got = make_request( - [CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1)], + [ + CircuitBreakingClient::new(client_1, "client_1", ARBITRARY_TEST_ERROR_WINDOW) + .with_circuit_breaker(circuit_1), + ], 1, ) .await; @@ -714,8 +728,10 @@ mod tests { let got = make_request( [ - CircuitBreakingClient::new(client_1, "client_1").with_circuit_breaker(circuit_1), - CircuitBreakingClient::new(client_2, "client_2").with_circuit_breaker(circuit_2), + CircuitBreakingClient::new(client_1, "client_1", ARBITRARY_TEST_ERROR_WINDOW) + .with_circuit_breaker(circuit_1), + CircuitBreakingClient::new(client_2, "client_2", ARBITRARY_TEST_ERROR_WINDOW) + .with_circuit_breaker(circuit_2), ], 2, // 2 copies required ) @@ -738,10 +754,18 @@ mod tests { let got = make_request( [ - CircuitBreakingClient::new(Arc::clone(&client_1), "client_1") - .with_circuit_breaker(circuit_1), - CircuitBreakingClient::new(Arc::clone(&client_2), "client_2") - .with_circuit_breaker(circuit_2), + CircuitBreakingClient::new( + Arc::clone(&client_1), + "client_1", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(circuit_1), + CircuitBreakingClient::new( + Arc::clone(&client_2), + "client_2", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(circuit_2), ], 2, // 2 copies required ) @@ -774,10 +798,18 @@ mod tests { circuit_2.set_healthy(true); let mut clients = vec![ - CircuitBreakingClient::new(Arc::clone(&client_1), "client_1") - .with_circuit_breaker(circuit_1), - CircuitBreakingClient::new(Arc::clone(&client_2), "client_2") - .with_circuit_breaker(circuit_2), + CircuitBreakingClient::new( + Arc::clone(&client_1), + "client_1", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(circuit_1), + CircuitBreakingClient::new( + Arc::clone(&client_2), + "client_2", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(circuit_2), ]; // The order should never affect the outcome. @@ -820,10 +852,18 @@ mod tests { let got = make_request( [ - CircuitBreakingClient::new(Arc::clone(&client_1), "client_1") - .with_circuit_breaker(circuit_1), - CircuitBreakingClient::new(Arc::clone(&client_2), "client_2") - .with_circuit_breaker(circuit_2), + CircuitBreakingClient::new( + Arc::clone(&client_1), + "client_1", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(circuit_1), + CircuitBreakingClient::new( + Arc::clone(&client_2), + "client_2", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(circuit_2), ], 2, // 2 copies required ) @@ -876,12 +916,24 @@ mod tests { circuit_3.set_healthy(true); let mut clients = vec![ - CircuitBreakingClient::new(Arc::clone(&client_1), "client_1") - .with_circuit_breaker(circuit_1), - CircuitBreakingClient::new(Arc::clone(&client_2), "client_2") - .with_circuit_breaker(circuit_2), - CircuitBreakingClient::new(Arc::clone(&client_3), "client_3") - .with_circuit_breaker(circuit_3), + CircuitBreakingClient::new( + Arc::clone(&client_1), + "client_1", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(circuit_1), + CircuitBreakingClient::new( + Arc::clone(&client_2), + "client_2", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(circuit_2), + CircuitBreakingClient::new( + Arc::clone(&client_3), + "client_3", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(circuit_3), ]; // The order should never affect the outcome. @@ -983,7 +1035,7 @@ mod tests { async move { let endpoints = upstreams.into_iter() .map(|(circuit, client)| { - CircuitBreakingClient::new(client, "bananas") + CircuitBreakingClient::new(client, "bananas",ARBITRARY_TEST_ERROR_WINDOW) .with_circuit_breaker(circuit) }); diff --git a/router/src/dml_handlers/rpc_write/balancer.rs b/router/src/dml_handlers/rpc_write/balancer.rs index 98f903f084..235ae6d7be 100644 --- a/router/src/dml_handlers/rpc_write/balancer.rs +++ b/router/src/dml_handlers/rpc_write/balancer.rs @@ -219,6 +219,8 @@ mod tests { use super::*; + const ARBITRARY_TEST_ERROR_WINDOW: Duration = Duration::from_secs(5); + /// No healthy nodes prevents a snapshot from being returned. #[tokio::test] async fn test_balancer_empty_iter() { @@ -227,16 +229,22 @@ mod tests { let circuit_err_1 = Arc::new(MockCircuitBreaker::default()); circuit_err_1.set_healthy(false); circuit_err_1.set_should_probe(false); - let client_err_1 = - CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") - .with_circuit_breaker(Arc::clone(&circuit_err_1)); + let client_err_1 = CircuitBreakingClient::new( + Arc::new(MockWriteClient::default()), + "bananas", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(Arc::clone(&circuit_err_1)); let circuit_err_2 = Arc::new(MockCircuitBreaker::default()); circuit_err_2.set_healthy(false); circuit_err_2.set_should_probe(false); - let client_err_2 = - CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") - .with_circuit_breaker(Arc::clone(&circuit_err_2)); + let client_err_2 = CircuitBreakingClient::new( + Arc::new(MockWriteClient::default()), + "bananas", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(Arc::clone(&circuit_err_2)); assert_eq!(circuit_err_1.ok_count(), 0); assert_eq!(circuit_err_2.ok_count(), 0); @@ -254,21 +262,31 @@ mod tests { let circuit_err_1 = Arc::new(MockCircuitBreaker::default()); circuit_err_1.set_healthy(false); circuit_err_1.set_should_probe(true); - let client_err_1 = - CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") - .with_circuit_breaker(Arc::clone(&circuit_err_1)); + let client_err_1 = CircuitBreakingClient::new( + Arc::new(MockWriteClient::default()), + "bananas", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(Arc::clone(&circuit_err_1)); let circuit_err_2 = Arc::new(MockCircuitBreaker::default()); circuit_err_2.set_healthy(false); circuit_err_2.set_should_probe(true); - let client_err_2 = - CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") - .with_circuit_breaker(Arc::clone(&circuit_err_2)); + let client_err_2 = CircuitBreakingClient::new( + Arc::new(MockWriteClient::default()), + "bananas", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(Arc::clone(&circuit_err_2)); let circuit_ok = Arc::new(MockCircuitBreaker::default()); circuit_ok.set_healthy(true); circuit_ok.set_should_probe(false); - let client_ok = CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") - .with_circuit_breaker(Arc::clone(&circuit_ok)); + let client_ok = CircuitBreakingClient::new( + Arc::new(MockWriteClient::default()), + "bananas", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(Arc::clone(&circuit_ok)); let balancer = Balancer::new([client_err_1, client_err_2, client_ok], None); @@ -323,21 +341,31 @@ mod tests { let circuit_err_1 = Arc::new(MockCircuitBreaker::default()); circuit_err_1.set_healthy(false); circuit_err_1.set_should_probe(false); - let client_err_1 = - CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") - .with_circuit_breaker(Arc::clone(&circuit_err_1)); + let client_err_1 = CircuitBreakingClient::new( + Arc::new(MockWriteClient::default()), + "bananas", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(Arc::clone(&circuit_err_1)); let circuit_err_2 = Arc::new(MockCircuitBreaker::default()); circuit_err_2.set_healthy(false); circuit_err_2.set_should_probe(false); - let client_err_2 = - CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") - .with_circuit_breaker(Arc::clone(&circuit_err_2)); + let client_err_2 = CircuitBreakingClient::new( + Arc::new(MockWriteClient::default()), + "bananas", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(Arc::clone(&circuit_err_2)); let circuit_ok = Arc::new(MockCircuitBreaker::default()); circuit_ok.set_healthy(true); - let client_ok = CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") - .with_circuit_breaker(Arc::clone(&circuit_ok)); + let client_ok = CircuitBreakingClient::new( + Arc::new(MockWriteClient::default()), + "bananas", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(Arc::clone(&circuit_ok)); assert_eq!(circuit_ok.ok_count(), 0); assert_eq!(circuit_err_1.ok_count(), 0); @@ -383,8 +411,12 @@ mod tests { let circuit = Arc::new(MockCircuitBreaker::default()); circuit.set_healthy(false); circuit.set_should_probe(false); - let client = CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") - .with_circuit_breaker(Arc::clone(&circuit)); + let client = CircuitBreakingClient::new( + Arc::new(MockWriteClient::default()), + "bananas", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(Arc::clone(&circuit)); assert_eq!(circuit.ok_count(), 0); @@ -432,21 +464,30 @@ mod tests { // two returns a healthy state, one is unhealthy. let circuit_err = Arc::new(MockCircuitBreaker::default()); circuit_err.set_healthy(false); - let client_err = - CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") - .with_circuit_breaker(Arc::clone(&circuit_err)); + let client_err = CircuitBreakingClient::new( + Arc::new(MockWriteClient::default()), + "bananas", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(Arc::clone(&circuit_err)); let circuit_ok_1 = Arc::new(MockCircuitBreaker::default()); circuit_ok_1.set_healthy(true); - let client_ok_1 = - CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") - .with_circuit_breaker(Arc::clone(&circuit_ok_1)); + let client_ok_1 = CircuitBreakingClient::new( + Arc::new(MockWriteClient::default()), + "bananas", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(Arc::clone(&circuit_ok_1)); let circuit_ok_2 = Arc::new(MockCircuitBreaker::default()); circuit_ok_2.set_healthy(true); - let client_ok_2 = - CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bananas") - .with_circuit_breaker(Arc::clone(&circuit_ok_2)); + let client_ok_2 = CircuitBreakingClient::new( + Arc::new(MockWriteClient::default()), + "bananas", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(Arc::clone(&circuit_ok_2)); let balancer = Balancer::new([client_err, client_ok_1, client_ok_2], None); @@ -478,22 +519,31 @@ mod tests { let circuit_err_1 = Arc::new(MockCircuitBreaker::default()); circuit_err_1.set_healthy(false); circuit_err_1.set_should_probe(false); - let client_err_1 = - CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bad-client-1") - .with_circuit_breaker(Arc::clone(&circuit_err_1)); + let client_err_1 = CircuitBreakingClient::new( + Arc::new(MockWriteClient::default()), + "bad-client-1", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(Arc::clone(&circuit_err_1)); let circuit_err_2 = Arc::new(MockCircuitBreaker::default()); circuit_err_2.set_healthy(false); circuit_err_2.set_should_probe(true); - let client_err_2 = - CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bad-client-2") - .with_circuit_breaker(Arc::clone(&circuit_err_2)); + let client_err_2 = CircuitBreakingClient::new( + Arc::new(MockWriteClient::default()), + "bad-client-2", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(Arc::clone(&circuit_err_2)); let circuit_ok = Arc::new(MockCircuitBreaker::default()); circuit_ok.set_healthy(true); - let client_ok = - CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "ok-client") - .with_circuit_breaker(Arc::clone(&circuit_ok)); + let client_ok = CircuitBreakingClient::new( + Arc::new(MockWriteClient::default()), + "ok-client", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(Arc::clone(&circuit_ok)); let balancer = Balancer::new([client_err_1, client_err_2, client_ok], None); @@ -558,9 +608,12 @@ mod tests { let circuit_err = Arc::new(MockCircuitBreaker::default()); circuit_err.set_healthy(false); circuit_err.set_should_probe(false); - let client_err = - CircuitBreakingClient::new(Arc::new(MockWriteClient::default()), "bad-client-1") - .with_circuit_breaker(Arc::clone(&circuit_err)); + let client_err = CircuitBreakingClient::new( + Arc::new(MockWriteClient::default()), + "bad-client-1", + ARBITRARY_TEST_ERROR_WINDOW, + ) + .with_circuit_breaker(Arc::clone(&circuit_err)); let balancer = Balancer::new([client_err], None); assert!(balancer.endpoints().is_none()); diff --git a/router/src/dml_handlers/rpc_write/circuit_breaker.rs b/router/src/dml_handlers/rpc_write/circuit_breaker.rs index 5c21213607..40ea8fc03e 100644 --- a/router/src/dml_handlers/rpc_write/circuit_breaker.rs +++ b/router/src/dml_handlers/rpc_write/circuit_breaker.rs @@ -16,13 +16,10 @@ use tokio::{ }; /// The limit on the ratio of the number of error requests to the number of -/// successful requests within an [`ERROR_WINDOW`] interval to be considered -/// healthy. +/// successful requests within the configured error window to be considered +/// healthy. If updating this value, remember to update the documentation +/// in the CLI flag for the configurable error window. const MAX_ERROR_RATIO: f32 = 0.8; -/// The (discrete) slices of time in which the error ratio must exceed -/// [`MAX_ERROR_RATIO`] to cause the [`CircuitBreaker`] to transition to the -/// unhealthy state. -const ERROR_WINDOW: Duration = Duration::from_secs(5); /// The maximum number of probe requests to allow when in an unhealthy state. const NUM_PROBES: u64 = 10; /// The length of time during which up to [`NUM_PROBES`] are allowed when in an @@ -64,7 +61,7 @@ const PROBE_INTERVAL: Duration = Duration::from_secs(1); /// # Implementation /// /// The circuit breaker is considered unhealthy when 80% ([`MAX_ERROR_RATIO`]) -/// of requests within a 5 second window [`ERROR_WINDOW`] fail. The breaker +/// of requests within the configured error window fail. The breaker /// becomes healthy again when the error rate falls below 80% /// ([`MAX_ERROR_RATIO`]) for the, at most, 10 probe requests ([`NUM_PROBES`]) /// allowed through within 1 second ([`PROBE_INTERVAL`]). @@ -80,7 +77,7 @@ const PROBE_INTERVAL: Duration = Duration::from_secs(1); /// breaker is considered healthy when the ratio of the number of error /// requests to the number of successful requests in the current window is /// less than [`MAX_ERROR_RATIO`]. If the ratio of errors exceeds -/// [`MAX_ERROR_RATIO`] within a single [`ERROR_WINDOW`] duration of time, +/// [`MAX_ERROR_RATIO`] within a single error window, /// the circuit breaker is then considered to be in the "open/unhealthy" /// state. /// @@ -102,12 +99,12 @@ const PROBE_INTERVAL: Duration = Duration::from_secs(1); /// /// Successful requests and errors are recorded when passed to /// [`CircuitBreaker::observe()`]. These counters are reset at intervals of -/// [`ERROR_WINDOW`], meaning that the ratio of errors must exceed +/// the configured error window, meaning that the ratio of errors must exceed /// [`MAX_ERROR_RATIO`] within a single window to open the circuit breaker to /// start being considered unhealthy. /// -/// A floor of at least `MAX_ERROR_RATIO * NUM_PROBES` must be observed per -/// [`ERROR_WINDOW`] before the circuit breaker opens / becomes unhealthy. +/// A floor of at least [`MAX_ERROR_RATIO`] * [`NUM_PROBES`] must be observed per +/// error window before the circuit breaker opens / becomes unhealthy. /// /// Error ratios are measured on every call to [`CircuitBreaker::is_healthy`], /// which should be done before determining whether to perform each request. @@ -121,8 +118,8 @@ const PROBE_INTERVAL: Duration = Duration::from_secs(1); /// /// ## Probing / Closing (becoming healthy) /// -/// Once a circuit breaker transitions to "open/unhealthy", up to [`NUM_PROBES`] -/// requests are allowed per [`PROBE_INTERVAL`], as determined by calling +/// Once a circuit breaker transitions to "open/unhealthy", up to 10 [`NUM_PROBES`] +/// requests are allowed per 1s [`PROBE_INTERVAL`], as determined by calling /// [`CircuitBreaker::should_probe`] before sending a request. This is referred /// to as "probing", allowing the client to discover the state of the /// (potentially unavailable) remote while bounding the number of requests that @@ -145,7 +142,7 @@ const PROBE_INTERVAL: Duration = Duration::from_secs(1); #[derive(Debug)] pub struct CircuitBreaker { /// Counters tracking the number of [`Ok`] and [`Err`] observed in the - /// current [`ERROR_WINDOW`]. + /// current error window. /// /// When the total number of requests ([`RequestCounterValue::total()`]) is /// less than [`NUM_PROBES`], the circuit is in the "probing" regime. When @@ -157,7 +154,7 @@ pub struct CircuitBreaker { /// should be allowed and resetting the [`PROBE_INTERVAL`]. probes: Mutex, - /// A task to reset the request count at intervals of [`ERROR_WINDOW`]. + /// A task to reset the request count at the configured error window. reset_task: JoinHandle<()>, /// A string description of the endpoint this [`CircuitBreaker`] models. @@ -178,13 +175,13 @@ struct ProbeState { } impl CircuitBreaker { - pub(crate) fn new(endpoint: impl Into>) -> Self { + pub(crate) fn new(endpoint: impl Into>, error_window: Duration) -> Self { let requests = Arc::new(RequestCounter::default()); let s = Self { requests: Arc::clone(&requests), probes: Mutex::new(ProbeState::default()), reset_task: tokio::spawn(async move { - let mut ticker = tokio::time::interval(ERROR_WINDOW); + let mut ticker = tokio::time::interval(error_window); ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { ticker.tick().await; @@ -366,7 +363,7 @@ fn is_healthy(counts: RequestCounterValue) -> bool { /// Resets the absolute request counter values if the current circuit state is /// "closed" (healthy, not probing) at the time of the call, such that the there -/// must be NUM_PROBES * MAX_ERROR_RATIO number of failed requests to open the +/// must be [`NUM_PROBES`] * [`MAX_ERROR_RATIO`] number of failed requests to open the /// circuit (mark as unhealthy). /// /// Retains the closed/healthy state of the circuit. This is NOT an atomic @@ -466,7 +463,7 @@ mod tests { /// Return a new [`CircuitBreaker`] with the reset ticker disabled. fn new_no_reset() -> CircuitBreaker { - let c = CircuitBreaker::new("bananas"); + let c = CircuitBreaker::new("bananas", Duration::from_secs(5)); c.reset_task.abort(); c } @@ -599,13 +596,13 @@ mod tests { } /// The circuit is marked unhealthy if the error rate exceeds - /// MAX_ERROR_RATIO within a single ERROR_WINDOW (approximately). + /// MAX_ERROR_RATIO within a single error window (approximately). /// /// This test ensures the counter reset logic prevents errors from different - /// ERROR_WINDOW periods from changing the circuit to open/unhealthy. + /// error window periods from changing the circuit to open/unhealthy. #[tokio::test] async fn test_periodic_counter_reset() { - let c = CircuitBreaker::new("bananas"); + let c = CircuitBreaker::new("bananas", Duration::from_secs(5)); // Assert the circuit breaker as healthy. assert!(c.is_healthy()); diff --git a/router/src/dml_handlers/rpc_write/circuit_breaking_client.rs b/router/src/dml_handlers/rpc_write/circuit_breaking_client.rs index b6436f403f..0efd42a96c 100644 --- a/router/src/dml_handlers/rpc_write/circuit_breaking_client.rs +++ b/router/src/dml_handlers/rpc_write/circuit_breaking_client.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, sync::Arc}; +use std::{fmt::Debug, sync::Arc, time::Duration}; use async_trait::async_trait; use generated_types::influxdata::iox::ingester::v1::WriteRequest; @@ -51,9 +51,13 @@ pub(super) struct CircuitBreakingClient { } impl CircuitBreakingClient { - pub(super) fn new(inner: T, endpoint_name: impl Into>) -> Self { + pub(super) fn new( + inner: T, + endpoint_name: impl Into>, + error_window: Duration, + ) -> Self { let endpoint_name = endpoint_name.into(); - let state = CircuitBreaker::new(Arc::clone(&endpoint_name)); + let state = CircuitBreaker::new(Arc::clone(&endpoint_name), error_window); state.set_healthy(); Self { inner, @@ -181,8 +185,12 @@ mod tests { #[tokio::test] async fn test_healthy() { let circuit_breaker = Arc::new(MockCircuitBreaker::default()); - let wrapper = CircuitBreakingClient::new(MockWriteClient::default(), "bananas") - .with_circuit_breaker(Arc::clone(&circuit_breaker)); + let wrapper = CircuitBreakingClient::new( + MockWriteClient::default(), + "bananas", + Duration::from_secs(5), + ) + .with_circuit_breaker(Arc::clone(&circuit_breaker)); circuit_breaker.set_healthy(true); assert_eq!(wrapper.is_healthy(), circuit_breaker.is_healthy()); @@ -213,8 +221,9 @@ mod tests { .into_iter(), )), ); - let wrapper = CircuitBreakingClient::new(Arc::clone(&mock_client), "bananas") - .with_circuit_breaker(Arc::clone(&circuit_breaker)); + let wrapper = + CircuitBreakingClient::new(Arc::clone(&mock_client), "bananas", Duration::from_secs(5)) + .with_circuit_breaker(Arc::clone(&circuit_breaker)); assert_eq!(circuit_breaker.ok_count(), 0); assert_eq!(circuit_breaker.err_count(), 0); diff --git a/router/tests/common/mod.rs b/router/tests/common/mod.rs index d8cdb5d2f1..6550b3b58e 100644 --- a/router/tests/common/mod.rs +++ b/router/tests/common/mod.rs @@ -34,6 +34,7 @@ pub const TEST_RETENTION_PERIOD: Duration = Duration::from_secs(3600); pub struct TestContextBuilder { namespace_autocreation: MissingNamespaceAction, single_tenancy: bool, + rpc_write_error_window: Duration, } impl Default for TestContextBuilder { @@ -41,6 +42,7 @@ impl Default for TestContextBuilder { Self { namespace_autocreation: MissingNamespaceAction::Reject, single_tenancy: false, + rpc_write_error_window: Duration::from_secs(5), } } } @@ -79,6 +81,7 @@ impl TestContextBuilder { self.single_tenancy, catalog, metrics, + self.rpc_write_error_window, ) .await } @@ -94,6 +97,7 @@ pub struct TestContext { namespace_autocreation: MissingNamespaceAction, single_tenancy: bool, + rpc_write_error_window: Duration, } // This mass of words is certainly a downside of chained handlers. @@ -133,12 +137,14 @@ impl TestContext { single_tenancy: bool, catalog: Arc, metrics: Arc, + rpc_write_error_window: Duration, ) -> Self { let client = Arc::new(MockWriteClient::default()); let rpc_writer = RpcWrite::new( [(Arc::clone(&client), "mock client")], 1.try_into().unwrap(), &metrics, + rpc_write_error_window, ); let ns_cache = Arc::new(ReadThroughCache::new( @@ -195,6 +201,7 @@ impl TestContext { namespace_autocreation, single_tenancy, + rpc_write_error_window, } } @@ -208,6 +215,7 @@ impl TestContext { self.single_tenancy, catalog, metrics, + self.rpc_write_error_window, ) .await } diff --git a/service_grpc_flight/Cargo.toml b/service_grpc_flight/Cargo.toml index 12eb8c23c1..a5374ff1a4 100644 --- a/service_grpc_flight/Cargo.toml +++ b/service_grpc_flight/Cargo.toml @@ -26,7 +26,7 @@ bytes = "1.4" futures = "0.3" prost = "0.11" serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0.103" +serde_json = "1.0.104" snafu = "0.7" tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } tonic = { workspace = true } diff --git a/service_grpc_influxrpc/Cargo.toml b/service_grpc_influxrpc/Cargo.toml index bee0b7c30d..4f9b66194c 100644 --- a/service_grpc_influxrpc/Cargo.toml +++ b/service_grpc_influxrpc/Cargo.toml @@ -30,7 +30,7 @@ pin-project = "1.1" prost = "0.11" regex = "1.9.1" serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0.103" +serde_json = "1.0.104" snafu = "0.7" tokio-stream = { version = "0.1", features = ["net"] } tonic = { workspace = true } diff --git a/tracker/Cargo.toml b/tracker/Cargo.toml index 2d33754e81..83f584d6e7 100644 --- a/tracker/Cargo.toml +++ b/tracker/Cargo.toml @@ -19,7 +19,7 @@ tokio = { version = "1.29", features = ["macros", "parking_lot", "sync", "time"] tokio-util = { version = "0.7.8" } trace = { path = "../trace"} workspace-hack = { version = "0.1", path = "../workspace-hack" } -sysinfo = "0.29.6" +sysinfo = "0.29.7" [dev-dependencies] tempfile = "3.7.0"