From d6c4b51ba84a3567affc8337ab74d22eedda6e49 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 21 Jun 2023 14:55:02 +0200 Subject: [PATCH 1/4] refactor: introduce catalog query indirection Add indirection between the CatalogPartitionFilesSource (within the retry-loop) and the underlying catalog. --- .../partition_files_source/catalog.rs | 57 +++++++++++++------ 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/compactor/src/components/partition_files_source/catalog.rs b/compactor/src/components/partition_files_source/catalog.rs index 0fdd1865c7..3f48302b60 100644 --- a/compactor/src/components/partition_files_source/catalog.rs +++ b/compactor/src/components/partition_files_source/catalog.rs @@ -1,20 +1,45 @@ -use std::{fmt::Display, sync::Arc}; +use std::{ + fmt::{Debug, Display}, + sync::Arc, +}; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; use data_types::{ParquetFile, PartitionId}; use iox_catalog::interface::Catalog; -use super::PartitionFilesSource; +use super::{rate_limit::QueryRateLimit, PartitionFilesSource}; -#[derive(Debug)] -pub struct CatalogPartitionFilesSource { - backoff_config: BackoffConfig, - catalog: Arc, +#[async_trait] +pub(crate) trait CatalogQuerier: Send + Sync + Debug { + async fn get_partitions( + &self, + partition_id: PartitionId, + ) -> Result, iox_catalog::interface::Error>; } -impl CatalogPartitionFilesSource { - pub fn new(backoff_config: BackoffConfig, catalog: Arc) -> Self { +#[async_trait] +impl CatalogQuerier for Arc { + async fn get_partitions( + &self, + partition_id: PartitionId, + ) -> Result, iox_catalog::interface::Error> { + self.repositories() + .await + .parquet_files() + .list_by_partition_not_to_delete(partition_id) + .await + } +} + +#[derive(Debug)] +pub struct CatalogPartitionFilesSource>> { + backoff_config: BackoffConfig, + catalog: T, +} + +impl CatalogPartitionFilesSource { + pub fn new(backoff_config: BackoffConfig, catalog: T) -> Self { Self { backoff_config, catalog, @@ -22,23 +47,21 @@ impl CatalogPartitionFilesSource { } } -impl Display for CatalogPartitionFilesSource { +impl Display for CatalogPartitionFilesSource { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "catalog") } } #[async_trait] -impl PartitionFilesSource for CatalogPartitionFilesSource { - async fn fetch(&self, partition: PartitionId) -> Vec { +impl PartitionFilesSource for CatalogPartitionFilesSource +where + T: CatalogQuerier, +{ + async fn fetch(&self, partition_id: PartitionId) -> Vec { Backoff::new(&self.backoff_config) .retry_all_errors("parquet_files_of_given_partition", || async { - self.catalog - .repositories() - .await - .parquet_files() - .list_by_partition_not_to_delete(partition) - .await + self.catalog.get_partitions(partition_id).await }) .await .expect("retry forever") From 48e73fdf632b403eccf8b90a166a9c55cf8debc4 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 21 Jun 2023 15:37:15 +0200 Subject: [PATCH 2/4] feat(compactor): partition fetch rate limiter Implements a (very) simple rate limiter that permits at most N requests per second, smoothed over a full second. --- .../components/partition_files_source/mod.rs | 1 + .../partition_files_source/rate_limit.rs | 109 ++++++++++++++++++ 2 files changed, 110 insertions(+) create mode 100644 compactor/src/components/partition_files_source/rate_limit.rs diff --git a/compactor/src/components/partition_files_source/mod.rs b/compactor/src/components/partition_files_source/mod.rs index 25b955fb57..da6f099e73 100644 --- a/compactor/src/components/partition_files_source/mod.rs +++ b/compactor/src/components/partition_files_source/mod.rs @@ -5,6 +5,7 @@ use data_types::{ParquetFile, PartitionId}; pub mod catalog; pub mod mock; +pub mod rate_limit; /// Finds files in a partition for compaction #[async_trait] diff --git a/compactor/src/components/partition_files_source/rate_limit.rs b/compactor/src/components/partition_files_source/rate_limit.rs new file mode 100644 index 0000000000..1599838fc7 --- /dev/null +++ b/compactor/src/components/partition_files_source/rate_limit.rs @@ -0,0 +1,109 @@ +use std::{sync::Mutex, time::Duration}; + +use async_trait::async_trait; +use data_types::{ParquetFile, PartitionId}; +use observability_deps::tracing::warn; +use tokio::time::Instant; + +use super::catalog::CatalogQuerier; + +/// A [`CatalogQuerier`] rate limiter that smooths `N` queries over a second. +#[derive(Debug)] +pub struct QueryRateLimit { + inner: T, + + last_query: Mutex, + min_interval: Duration, +} + +impl QueryRateLimit { + pub(crate) fn new(inner: T, rps: usize) -> Self { + Self { + inner, + last_query: Mutex::new(Instant::now()), + min_interval: Duration::from_secs(1) / rps as u32, + } + } + + fn can_proceed(&self) -> bool { + let mut last_query = self.last_query.lock().unwrap(); + let now = Instant::now(); + + // Has enough time passed since the last query was allowed? + let next_allowed = last_query.checked_add(self.min_interval).unwrap(); + if now < next_allowed { + return false; + } + + *last_query = now; + true + } +} + +#[async_trait] +impl CatalogQuerier for QueryRateLimit +where + T: CatalogQuerier, +{ + async fn get_partitions( + &self, + partition_id: PartitionId, + ) -> Result, iox_catalog::interface::Error> { + while !self.can_proceed() { + warn!(%partition_id, "partition fetch rate limited"); + + // Don't busy loop - wait the fractions of a second before a retry + // is allowed. + tokio::time::sleep(self.min_interval).await; + } + self.inner.get_partitions(partition_id).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// A [`CatalogQuerier`] that always returns OK, and counts the number of + /// calls made. + #[derive(Debug, Default)] + struct MockInner(Mutex); + #[async_trait] + impl CatalogQuerier for &MockInner { + async fn get_partitions( + &self, + _partition_id: PartitionId, + ) -> Result, iox_catalog::interface::Error> { + *self.0.lock().unwrap() += 1; + Ok(vec![]) + } + } + + #[tokio::test] + async fn test_rate_limit() { + const ALLOWED_PER_SECOND: usize = 100; + + let inner = MockInner::default(); + let r = QueryRateLimit::new(&inner, ALLOWED_PER_SECOND); + + let start = Instant::now(); + + // If there are ALLOWED_PER_SECOND queries allowed per second, then it + // should take 1 second to issue ALLOWED_PER_SECOND number of queries. + // + // Attempt to make 1/10th the number of permissible queries per second, + // which should take at least 1/10th of a second due to smoothing, so + // the test does not take so long. + for _ in 0..(ALLOWED_PER_SECOND / 10) { + r.get_partitions(PartitionId::new(42)).await.unwrap(); + } + + // It should have taken at least 1/10th of a second + let duration = Instant::now() - start; + assert!(duration > Duration::from_millis(ALLOWED_PER_SECOND as u64 / 10)); + + // Exactly 1/10th the number of queries should be dispatched to the + // inner impl. + assert_eq!(*inner.0.lock().unwrap(), ALLOWED_PER_SECOND / 10); + } +} From d1cbbd27b149fe7800910c4271b4f56a95a73aa5 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 21 Jun 2023 15:47:32 +0200 Subject: [PATCH 3/4] feat(compactor): config partition query rate limit Allow the partition fetch queries to be (optionally) rate limited via runtime config. --- clap_blocks/src/compactor.rs | 11 +++++++++++ compactor/src/components/hardcoded.rs | 18 +++++++++++++----- compactor/src/components/report.rs | 2 ++ compactor/src/config.rs | 6 ++++++ compactor_test_utils/src/lib.rs | 1 + influxdb_iox/src/commands/run/all_in_one.rs | 1 + ioxd_compactor/src/lib.rs | 2 ++ 7 files changed, 36 insertions(+), 5 deletions(-) diff --git a/clap_blocks/src/compactor.rs b/clap_blocks/src/compactor.rs index 7e69abaaf1..35ce013788 100644 --- a/clap_blocks/src/compactor.rs +++ b/clap_blocks/src/compactor.rs @@ -235,4 +235,15 @@ pub struct CompactorConfig { action )] pub max_num_columns_per_table: usize, + + /// Limit the number of partition fetch queries to at most the specified + /// number of queries per second. + /// + /// Queries are smoothed over the full second. + #[clap( + long = "max-partition-fetch-queries-per-second", + env = "INFLUXDB_IOX_MAX_PARTITION_FETCH_QUERIES_PER_SECOND", + action + )] + pub max_partition_fetch_queries_per_second: Option, } diff --git a/compactor/src/components/hardcoded.rs b/compactor/src/components/hardcoded.rs index 2c129930b5..4e9278a14a 100644 --- a/compactor/src/components/hardcoded.rs +++ b/compactor/src/components/hardcoded.rs @@ -47,7 +47,9 @@ use super::{ logging::LoggingPartitionDoneSinkWrapper, metrics::MetricsPartitionDoneSinkWrapper, mock::MockPartitionDoneSink, PartitionDoneSink, }, - partition_files_source::{catalog::CatalogPartitionFilesSource, PartitionFilesSource}, + partition_files_source::{ + catalog::CatalogPartitionFilesSource, rate_limit::QueryRateLimit, PartitionFilesSource, + }, partition_filter::{ and::AndPartitionFilter, greater_matching_files::GreaterMatchingFilesPartitionFilter, greater_size_matching_files::GreaterSizeMatchingFilesPartitionFilter, @@ -265,10 +267,16 @@ fn make_partition_info_source(config: &Config) -> Arc { } fn make_partition_files_source(config: &Config) -> Arc { - Arc::new(CatalogPartitionFilesSource::new( - config.backoff_config.clone(), - Arc::clone(&config.catalog), - )) + match config.max_partition_fetch_queries_per_second { + Some(rps) => Arc::new(CatalogPartitionFilesSource::new( + config.backoff_config.clone(), + QueryRateLimit::new(Arc::clone(&config.catalog), rps), + )), + None => Arc::new(CatalogPartitionFilesSource::new( + config.backoff_config.clone(), + Arc::clone(&config.catalog), + )), + } } fn make_round_info_source(config: &Config) -> Arc { diff --git a/compactor/src/components/report.rs b/compactor/src/components/report.rs index 3f2e34934f..20f6300d46 100644 --- a/compactor/src/components/report.rs +++ b/compactor/src/components/report.rs @@ -39,6 +39,7 @@ pub fn log_config(config: &Config) { all_errors_are_fatal, max_num_columns_per_table, max_num_files_per_plan, + max_partition_fetch_queries_per_second, } = &config; let (shard_cfg_n_shards, shard_cfg_shard_id) = match shard_config { @@ -85,6 +86,7 @@ pub fn log_config(config: &Config) { all_errors_are_fatal, max_num_columns_per_table, max_num_files_per_plan, + max_partition_fetch_queries_per_second, "config", ); } diff --git a/compactor/src/config.rs b/compactor/src/config.rs index 9561c135e0..f6bbfc3e08 100644 --- a/compactor/src/config.rs +++ b/compactor/src/config.rs @@ -140,6 +140,12 @@ pub struct Config { /// max number of files per compaction plan pub max_num_files_per_plan: usize, + + /// Limit the number of partition fetch queries to at most the specified + /// number of queries per second. + /// + /// Queries are smoothed over the full second. + pub max_partition_fetch_queries_per_second: Option, } impl Config { diff --git a/compactor_test_utils/src/lib.rs b/compactor_test_utils/src/lib.rs index 54cca70f54..a61d508247 100644 --- a/compactor_test_utils/src/lib.rs +++ b/compactor_test_utils/src/lib.rs @@ -152,6 +152,7 @@ impl TestSetupBuilder { all_errors_are_fatal: true, max_num_columns_per_table: 200, max_num_files_per_plan: 200, + max_partition_fetch_queries_per_second: None, }; let bytes_written = Arc::new(AtomicUsize::new(0)); diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 6de669f22e..97e3694a0a 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -502,6 +502,7 @@ impl Config { process_once: false, max_num_columns_per_table: 200, max_num_files_per_plan: 200, + max_partition_fetch_queries_per_second: Some(500), }; let querier_config = QuerierConfig { diff --git a/ioxd_compactor/src/lib.rs b/ioxd_compactor/src/lib.rs index b60fa147b0..71821ce398 100644 --- a/ioxd_compactor/src/lib.rs +++ b/ioxd_compactor/src/lib.rs @@ -192,6 +192,8 @@ pub async fn create_compactor_server_type( all_errors_are_fatal: false, max_num_columns_per_table: compactor_config.max_num_columns_per_table, max_num_files_per_plan: compactor_config.max_num_files_per_plan, + max_partition_fetch_queries_per_second: compactor_config + .max_partition_fetch_queries_per_second, }); Arc::new(CompactorServerType::new( From cb79429b5fd2a7cd877ab37f3d2b790e500a9701 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 22 Jun 2023 14:07:47 +0200 Subject: [PATCH 4/4] refactor: wait until next attempt deadline Minor optimisation for cases where load exceeds the limit, but not by much - sleep until the next query is allowed, rather than a full query period. --- .../components/partition_files_source/rate_limit.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/compactor/src/components/partition_files_source/rate_limit.rs b/compactor/src/components/partition_files_source/rate_limit.rs index 1599838fc7..06d14efab2 100644 --- a/compactor/src/components/partition_files_source/rate_limit.rs +++ b/compactor/src/components/partition_files_source/rate_limit.rs @@ -25,18 +25,18 @@ impl QueryRateLimit { } } - fn can_proceed(&self) -> bool { + fn can_proceed(&self) -> Option { let mut last_query = self.last_query.lock().unwrap(); let now = Instant::now(); // Has enough time passed since the last query was allowed? let next_allowed = last_query.checked_add(self.min_interval).unwrap(); if now < next_allowed { - return false; + return Some(next_allowed - now); } *last_query = now; - true + None } } @@ -49,12 +49,12 @@ where &self, partition_id: PartitionId, ) -> Result, iox_catalog::interface::Error> { - while !self.can_proceed() { + while let Some(d) = self.can_proceed() { warn!(%partition_id, "partition fetch rate limited"); // Don't busy loop - wait the fractions of a second before a retry // is allowed. - tokio::time::sleep(self.min_interval).await; + tokio::time::sleep(d).await; } self.inner.get_partitions(partition_id).await }