Merge pull request #8035 from influxdata/dom/compactor-query-rate-limit
feat(compactor): partition fetch query rate limitpull/24376/head
commit
8604a1a0cf
|
@ -235,4 +235,15 @@ pub struct CompactorConfig {
|
|||
action
|
||||
)]
|
||||
pub max_num_columns_per_table: usize,
|
||||
|
||||
/// Limit the number of partition fetch queries to at most the specified
|
||||
/// number of queries per second.
|
||||
///
|
||||
/// Queries are smoothed over the full second.
|
||||
#[clap(
|
||||
long = "max-partition-fetch-queries-per-second",
|
||||
env = "INFLUXDB_IOX_MAX_PARTITION_FETCH_QUERIES_PER_SECOND",
|
||||
action
|
||||
)]
|
||||
pub max_partition_fetch_queries_per_second: Option<usize>,
|
||||
}
|
||||
|
|
|
@ -47,7 +47,9 @@ use super::{
|
|||
logging::LoggingPartitionDoneSinkWrapper, metrics::MetricsPartitionDoneSinkWrapper,
|
||||
mock::MockPartitionDoneSink, PartitionDoneSink,
|
||||
},
|
||||
partition_files_source::{catalog::CatalogPartitionFilesSource, PartitionFilesSource},
|
||||
partition_files_source::{
|
||||
catalog::CatalogPartitionFilesSource, rate_limit::QueryRateLimit, PartitionFilesSource,
|
||||
},
|
||||
partition_filter::{
|
||||
and::AndPartitionFilter, greater_matching_files::GreaterMatchingFilesPartitionFilter,
|
||||
greater_size_matching_files::GreaterSizeMatchingFilesPartitionFilter,
|
||||
|
@ -270,10 +272,16 @@ fn make_partition_info_source(config: &Config) -> Arc<dyn PartitionInfoSource> {
|
|||
}
|
||||
|
||||
fn make_partition_files_source(config: &Config) -> Arc<dyn PartitionFilesSource> {
|
||||
Arc::new(CatalogPartitionFilesSource::new(
|
||||
config.backoff_config.clone(),
|
||||
Arc::clone(&config.catalog),
|
||||
))
|
||||
match config.max_partition_fetch_queries_per_second {
|
||||
Some(rps) => Arc::new(CatalogPartitionFilesSource::new(
|
||||
config.backoff_config.clone(),
|
||||
QueryRateLimit::new(Arc::clone(&config.catalog), rps),
|
||||
)),
|
||||
None => Arc::new(CatalogPartitionFilesSource::new(
|
||||
config.backoff_config.clone(),
|
||||
Arc::clone(&config.catalog),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn make_round_info_source(config: &Config) -> Arc<dyn RoundInfoSource> {
|
||||
|
|
|
@ -1,20 +1,45 @@
|
|||
use std::{fmt::Display, sync::Arc};
|
||||
use std::{
|
||||
fmt::{Debug, Display},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
use data_types::{ParquetFile, PartitionId};
|
||||
use iox_catalog::interface::Catalog;
|
||||
|
||||
use super::PartitionFilesSource;
|
||||
use super::{rate_limit::QueryRateLimit, PartitionFilesSource};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CatalogPartitionFilesSource {
|
||||
backoff_config: BackoffConfig,
|
||||
catalog: Arc<dyn Catalog>,
|
||||
#[async_trait]
|
||||
pub(crate) trait CatalogQuerier: Send + Sync + Debug {
|
||||
async fn get_partitions(
|
||||
&self,
|
||||
partition_id: PartitionId,
|
||||
) -> Result<Vec<ParquetFile>, iox_catalog::interface::Error>;
|
||||
}
|
||||
|
||||
impl CatalogPartitionFilesSource {
|
||||
pub fn new(backoff_config: BackoffConfig, catalog: Arc<dyn Catalog>) -> Self {
|
||||
#[async_trait]
|
||||
impl CatalogQuerier for Arc<dyn Catalog> {
|
||||
async fn get_partitions(
|
||||
&self,
|
||||
partition_id: PartitionId,
|
||||
) -> Result<Vec<ParquetFile>, iox_catalog::interface::Error> {
|
||||
self.repositories()
|
||||
.await
|
||||
.parquet_files()
|
||||
.list_by_partition_not_to_delete(partition_id)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CatalogPartitionFilesSource<T = QueryRateLimit<Arc<dyn Catalog>>> {
|
||||
backoff_config: BackoffConfig,
|
||||
catalog: T,
|
||||
}
|
||||
|
||||
impl<T> CatalogPartitionFilesSource<T> {
|
||||
pub fn new(backoff_config: BackoffConfig, catalog: T) -> Self {
|
||||
Self {
|
||||
backoff_config,
|
||||
catalog,
|
||||
|
@ -22,23 +47,21 @@ impl CatalogPartitionFilesSource {
|
|||
}
|
||||
}
|
||||
|
||||
impl Display for CatalogPartitionFilesSource {
|
||||
impl<T> Display for CatalogPartitionFilesSource<T> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "catalog")
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PartitionFilesSource for CatalogPartitionFilesSource {
|
||||
async fn fetch(&self, partition: PartitionId) -> Vec<ParquetFile> {
|
||||
impl<T> PartitionFilesSource for CatalogPartitionFilesSource<T>
|
||||
where
|
||||
T: CatalogQuerier,
|
||||
{
|
||||
async fn fetch(&self, partition_id: PartitionId) -> Vec<ParquetFile> {
|
||||
Backoff::new(&self.backoff_config)
|
||||
.retry_all_errors("parquet_files_of_given_partition", || async {
|
||||
self.catalog
|
||||
.repositories()
|
||||
.await
|
||||
.parquet_files()
|
||||
.list_by_partition_not_to_delete(partition)
|
||||
.await
|
||||
self.catalog.get_partitions(partition_id).await
|
||||
})
|
||||
.await
|
||||
.expect("retry forever")
|
||||
|
|
|
@ -5,6 +5,7 @@ use data_types::{ParquetFile, PartitionId};
|
|||
|
||||
pub mod catalog;
|
||||
pub mod mock;
|
||||
pub mod rate_limit;
|
||||
|
||||
/// Finds files in a partition for compaction
|
||||
#[async_trait]
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
use std::{sync::Mutex, time::Duration};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{ParquetFile, PartitionId};
|
||||
use observability_deps::tracing::warn;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use super::catalog::CatalogQuerier;
|
||||
|
||||
/// A [`CatalogQuerier`] rate limiter that smooths `N` queries over a second.
|
||||
#[derive(Debug)]
|
||||
pub struct QueryRateLimit<T> {
|
||||
inner: T,
|
||||
|
||||
last_query: Mutex<Instant>,
|
||||
min_interval: Duration,
|
||||
}
|
||||
|
||||
impl<T> QueryRateLimit<T> {
|
||||
pub(crate) fn new(inner: T, rps: usize) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
last_query: Mutex::new(Instant::now()),
|
||||
min_interval: Duration::from_secs(1) / rps as u32,
|
||||
}
|
||||
}
|
||||
|
||||
fn can_proceed(&self) -> Option<Duration> {
|
||||
let mut last_query = self.last_query.lock().unwrap();
|
||||
let now = Instant::now();
|
||||
|
||||
// Has enough time passed since the last query was allowed?
|
||||
let next_allowed = last_query.checked_add(self.min_interval).unwrap();
|
||||
if now < next_allowed {
|
||||
return Some(next_allowed - now);
|
||||
}
|
||||
|
||||
*last_query = now;
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T> CatalogQuerier for QueryRateLimit<T>
|
||||
where
|
||||
T: CatalogQuerier,
|
||||
{
|
||||
async fn get_partitions(
|
||||
&self,
|
||||
partition_id: PartitionId,
|
||||
) -> Result<Vec<ParquetFile>, iox_catalog::interface::Error> {
|
||||
while let Some(d) = self.can_proceed() {
|
||||
warn!(%partition_id, "partition fetch rate limited");
|
||||
|
||||
// Don't busy loop - wait the fractions of a second before a retry
|
||||
// is allowed.
|
||||
tokio::time::sleep(d).await;
|
||||
}
|
||||
self.inner.get_partitions(partition_id).await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
/// A [`CatalogQuerier`] that always returns OK, and counts the number of
|
||||
/// calls made.
|
||||
#[derive(Debug, Default)]
|
||||
struct MockInner(Mutex<usize>);
|
||||
#[async_trait]
|
||||
impl CatalogQuerier for &MockInner {
|
||||
async fn get_partitions(
|
||||
&self,
|
||||
_partition_id: PartitionId,
|
||||
) -> Result<Vec<ParquetFile>, iox_catalog::interface::Error> {
|
||||
*self.0.lock().unwrap() += 1;
|
||||
Ok(vec![])
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_rate_limit() {
|
||||
const ALLOWED_PER_SECOND: usize = 100;
|
||||
|
||||
let inner = MockInner::default();
|
||||
let r = QueryRateLimit::new(&inner, ALLOWED_PER_SECOND);
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
// If there are ALLOWED_PER_SECOND queries allowed per second, then it
|
||||
// should take 1 second to issue ALLOWED_PER_SECOND number of queries.
|
||||
//
|
||||
// Attempt to make 1/10th the number of permissible queries per second,
|
||||
// which should take at least 1/10th of a second due to smoothing, so
|
||||
// the test does not take so long.
|
||||
for _ in 0..(ALLOWED_PER_SECOND / 10) {
|
||||
r.get_partitions(PartitionId::new(42)).await.unwrap();
|
||||
}
|
||||
|
||||
// It should have taken at least 1/10th of a second
|
||||
let duration = Instant::now() - start;
|
||||
assert!(duration > Duration::from_millis(ALLOWED_PER_SECOND as u64 / 10));
|
||||
|
||||
// Exactly 1/10th the number of queries should be dispatched to the
|
||||
// inner impl.
|
||||
assert_eq!(*inner.0.lock().unwrap(), ALLOWED_PER_SECOND / 10);
|
||||
}
|
||||
}
|
|
@ -39,6 +39,7 @@ pub fn log_config(config: &Config) {
|
|||
all_errors_are_fatal,
|
||||
max_num_columns_per_table,
|
||||
max_num_files_per_plan,
|
||||
max_partition_fetch_queries_per_second,
|
||||
} = &config;
|
||||
|
||||
let (shard_cfg_n_shards, shard_cfg_shard_id) = match shard_config {
|
||||
|
@ -85,6 +86,7 @@ pub fn log_config(config: &Config) {
|
|||
all_errors_are_fatal,
|
||||
max_num_columns_per_table,
|
||||
max_num_files_per_plan,
|
||||
max_partition_fetch_queries_per_second,
|
||||
"config",
|
||||
);
|
||||
}
|
||||
|
|
|
@ -140,6 +140,12 @@ pub struct Config {
|
|||
|
||||
/// max number of files per compaction plan
|
||||
pub max_num_files_per_plan: usize,
|
||||
|
||||
/// Limit the number of partition fetch queries to at most the specified
|
||||
/// number of queries per second.
|
||||
///
|
||||
/// Queries are smoothed over the full second.
|
||||
pub max_partition_fetch_queries_per_second: Option<usize>,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
|
|
|
@ -152,6 +152,7 @@ impl TestSetupBuilder<false> {
|
|||
all_errors_are_fatal: true,
|
||||
max_num_columns_per_table: 200,
|
||||
max_num_files_per_plan: 200,
|
||||
max_partition_fetch_queries_per_second: None,
|
||||
};
|
||||
|
||||
let bytes_written = Arc::new(AtomicUsize::new(0));
|
||||
|
|
|
@ -502,6 +502,7 @@ impl Config {
|
|||
process_once: false,
|
||||
max_num_columns_per_table: 200,
|
||||
max_num_files_per_plan: 200,
|
||||
max_partition_fetch_queries_per_second: Some(500),
|
||||
};
|
||||
|
||||
let querier_config = QuerierConfig {
|
||||
|
|
|
@ -192,6 +192,8 @@ pub async fn create_compactor_server_type(
|
|||
all_errors_are_fatal: false,
|
||||
max_num_columns_per_table: compactor_config.max_num_columns_per_table,
|
||||
max_num_files_per_plan: compactor_config.max_num_files_per_plan,
|
||||
max_partition_fetch_queries_per_second: compactor_config
|
||||
.max_partition_fetch_queries_per_second,
|
||||
});
|
||||
|
||||
Arc::new(CompactorServerType::new(
|
||||
|
|
Loading…
Reference in New Issue