Merge pull request #4419 from influxdata/dom/http-service-protection-limit
feat(router2): http service protection limitpull/24376/head
commit
401009a7b6
|
@ -4977,8 +4977,10 @@ dependencies = [
|
|||
"serde",
|
||||
"serde_urlencoded",
|
||||
"siphasher",
|
||||
"test_helpers",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic",
|
||||
"trace",
|
||||
"workspace-hack",
|
||||
|
|
|
@ -339,6 +339,7 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
Arc::clone(&object_store),
|
||||
&write_buffer_config,
|
||||
query_pool_name,
|
||||
1_000, // max 1,000 concurrent HTTP requests
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
|
|
@ -67,6 +67,22 @@ pub struct Config {
|
|||
default_value = "iox-shared"
|
||||
)]
|
||||
pub(crate) query_pool_name: String,
|
||||
|
||||
/// The maximum number of simultaneous requests the HTTP server is
|
||||
/// configured to accept.
|
||||
///
|
||||
/// This number of requests, multiplied by the maximum request body size the
|
||||
/// HTTP server is configured with gives the rough amount of memory a HTTP
|
||||
/// server will use to buffer request bodies in memory.
|
||||
///
|
||||
/// A default maximum of 200 requests, multiplied by the default 10MiB
|
||||
/// maximum for HTTP request bodies == ~2GiB.
|
||||
#[clap(
|
||||
long = "--max-http-requests",
|
||||
env = "INFLUXDB_IOX_MAX_HTTP_REQUESTS",
|
||||
default_value = "200"
|
||||
)]
|
||||
pub(crate) http_request_limit: usize,
|
||||
}
|
||||
|
||||
pub async fn command(config: Config) -> Result<()> {
|
||||
|
@ -91,6 +107,7 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
object_store,
|
||||
&config.write_buffer_config,
|
||||
&config.query_pool_name,
|
||||
config.http_request_limit,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
|
|
@ -151,6 +151,7 @@ pub async fn create_router2_server_type(
|
|||
object_store: Arc<DynObjectStore>,
|
||||
write_buffer_config: &WriteBufferConfig,
|
||||
query_pool_name: &str,
|
||||
request_limit: usize,
|
||||
) -> Result<Arc<dyn ServerType>> {
|
||||
// Initialise the sharded write buffer and instrument it with DML handler
|
||||
// metrics.
|
||||
|
@ -265,6 +266,7 @@ pub async fn create_router2_server_type(
|
|||
let handler_stack = Arc::new(handler_stack);
|
||||
let http = HttpDelegate::new(
|
||||
common_state.run_config().max_http_request_size,
|
||||
request_limit,
|
||||
Arc::clone(&handler_stack),
|
||||
&metrics,
|
||||
);
|
||||
|
|
|
@ -48,6 +48,8 @@ paste = "1.0.7"
|
|||
pretty_assertions = "1.2.1"
|
||||
rand = "0.8.3"
|
||||
schema = { path = "../schema" }
|
||||
test_helpers = { version = "0.1.0", path = "../test_helpers", features = ["future_timeout"] }
|
||||
tokio-stream = { version = "0.1.8", default_features = false, features = [] }
|
||||
|
||||
[[bench]]
|
||||
name = "sharder"
|
||||
|
|
|
@ -71,7 +71,7 @@ fn e2e_benchmarks(c: &mut Criterion) {
|
|||
partitioner.and_then(WriteSummaryAdapter::new(FanOutAdaptor::new(write_buffer))),
|
||||
);
|
||||
|
||||
HttpDelegate::new(1024, Arc::new(handler_stack), &metrics)
|
||||
HttpDelegate::new(1024, 100, Arc::new(handler_stack), &metrics)
|
||||
};
|
||||
|
||||
let body_str = "platanos,tag1=A,tag2=B val=42i 123456";
|
||||
|
|
|
@ -17,6 +17,7 @@ use observability_deps::tracing::*;
|
|||
use predicate::delete_predicate::{parse_delete_predicate, parse_http_delete_request};
|
||||
use serde::Deserialize;
|
||||
use thiserror::Error;
|
||||
use tokio::sync::{Semaphore, TryAcquireError};
|
||||
use trace::ctx::SpanContext;
|
||||
use write_summary::WriteSummary;
|
||||
|
||||
|
@ -68,6 +69,11 @@ pub enum Error {
|
|||
/// An error returned from the [`DmlHandler`].
|
||||
#[error("dml handler error: {0}")]
|
||||
DmlHandler(#[from] DmlError),
|
||||
|
||||
/// The router is currently servicing the maximum permitted number of
|
||||
/// simultaneous requests.
|
||||
#[error("this service is overloaded, please try again later")]
|
||||
RequestLimit,
|
||||
}
|
||||
|
||||
impl Error {
|
||||
|
@ -89,6 +95,7 @@ impl Error {
|
|||
StatusCode::UNSUPPORTED_MEDIA_TYPE
|
||||
}
|
||||
Error::DmlHandler(err) => StatusCode::from(err),
|
||||
Error::RequestLimit => StatusCode::SERVICE_UNAVAILABLE,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -200,17 +207,27 @@ impl<T> TryFrom<&Request<T>> for WriteInfo {
|
|||
/// Requests to some paths may be handled externally by the caller - the IOx
|
||||
/// server runner framework takes care of implementing the heath endpoint,
|
||||
/// metrics, pprof, etc.
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Debug)]
|
||||
pub struct HttpDelegate<D, T = SystemProvider> {
|
||||
max_request_bytes: usize,
|
||||
time_provider: T,
|
||||
dml_handler: Arc<D>,
|
||||
|
||||
// A request limiter to restrict the number of simultaneous requests this
|
||||
// router services.
|
||||
//
|
||||
// This allows the router to drop a portion of requests when experiencing an
|
||||
// unusual flood of requests (i.e. due to peer routers crashing and
|
||||
// depleting the available instances in the pool) in order to preserve
|
||||
// overall system availability, instead of OOMing or otherwise failing.
|
||||
request_sem: Semaphore,
|
||||
|
||||
write_metric_lines: U64Counter,
|
||||
write_metric_fields: U64Counter,
|
||||
write_metric_tables: U64Counter,
|
||||
write_metric_body_size: U64Counter,
|
||||
delete_metric_body_size: U64Counter,
|
||||
request_limit_rejected: U64Counter,
|
||||
}
|
||||
|
||||
impl<D> HttpDelegate<D, SystemProvider> {
|
||||
|
@ -219,7 +236,12 @@ impl<D> HttpDelegate<D, SystemProvider> {
|
|||
///
|
||||
/// HTTP request bodies are limited to `max_request_bytes` in size,
|
||||
/// returning an error if exceeded.
|
||||
pub fn new(max_request_bytes: usize, dml_handler: Arc<D>, metrics: &metric::Registry) -> Self {
|
||||
pub fn new(
|
||||
max_request_bytes: usize,
|
||||
max_requests: usize,
|
||||
dml_handler: Arc<D>,
|
||||
metrics: &metric::Registry,
|
||||
) -> Self {
|
||||
let write_metric_lines = metrics
|
||||
.register_metric::<U64Counter>(
|
||||
"http_write_lines_total",
|
||||
|
@ -250,16 +272,24 @@ impl<D> HttpDelegate<D, SystemProvider> {
|
|||
"cumulative byte size of successfully routed (decompressed) delete requests",
|
||||
)
|
||||
.recorder(&[]);
|
||||
let request_limit_rejected = metrics
|
||||
.register_metric::<U64Counter>(
|
||||
"http_request_limit_rejected",
|
||||
"number of HTTP requests rejected due to exceeding parallel request limit",
|
||||
)
|
||||
.recorder(&[]);
|
||||
|
||||
Self {
|
||||
max_request_bytes,
|
||||
time_provider: SystemProvider::default(),
|
||||
dml_handler,
|
||||
request_sem: Semaphore::new(max_requests),
|
||||
write_metric_lines,
|
||||
write_metric_fields,
|
||||
write_metric_tables,
|
||||
write_metric_body_size,
|
||||
delete_metric_body_size,
|
||||
request_limit_rejected,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -272,6 +302,24 @@ where
|
|||
/// Routes `req` to the appropriate handler, if any, returning the handler
|
||||
/// response.
|
||||
pub async fn route(&self, req: Request<Body>) -> Result<Response<Body>, Error> {
|
||||
// Acquire and hold a permit for the duration of this request, or return
|
||||
// a 503 if the existing requests have already exhausted the allocation.
|
||||
//
|
||||
// By dropping requests at the routing stage, before the request buffer
|
||||
// is read/decompressed, this limit can efficiently shed load to avoid
|
||||
// unnecessary memory pressure (the resource this request limit usually
|
||||
// aims to protect.)
|
||||
let _permit = match self.request_sem.try_acquire() {
|
||||
Ok(p) => p,
|
||||
Err(TryAcquireError::NoPermits) => {
|
||||
error!("simultaneous request limit exceeded - dropping request");
|
||||
self.request_limit_rejected.inc(1);
|
||||
return Err(Error::RequestLimit);
|
||||
}
|
||||
Err(e) => panic!("request limiter error: {}", e),
|
||||
};
|
||||
|
||||
// Route the request to a handler.
|
||||
match (req.method(), req.uri().path()) {
|
||||
(&Method::POST, "/api/v2/write") => self.write_handler(req).await,
|
||||
(&Method::POST, "/api/v2/delete") => self.delete_handler(req).await,
|
||||
|
@ -451,7 +499,7 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{io::Write, iter, sync::Arc};
|
||||
use std::{io::Write, iter, sync::Arc, time::Duration};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
|
||||
|
@ -460,6 +508,8 @@ mod tests {
|
|||
use metric::{Attributes, Metric};
|
||||
use mutable_batch::column::ColumnData;
|
||||
use mutable_batch_lp::LineWriteError;
|
||||
use test_helpers::timeout::FutureTimeout;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
use crate::dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall};
|
||||
|
||||
|
@ -479,9 +529,10 @@ mod tests {
|
|||
.expect("failed to get observer")
|
||||
.fetch();
|
||||
|
||||
assert!(counter > 0, "metric {} did not record any values", name);
|
||||
if let Some(want) = value {
|
||||
assert_eq!(want, counter, "metric does not have expected value");
|
||||
} else {
|
||||
assert!(counter > 0, "metric {} did not record any values", name);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -556,7 +607,7 @@ mod tests {
|
|||
.with_delete_return($dml_delete_handler)
|
||||
);
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
let delegate = HttpDelegate::new(MAX_BYTES, Arc::clone(&dml_handler), &metrics);
|
||||
let delegate = HttpDelegate::new(MAX_BYTES, 100, Arc::clone(&dml_handler), &metrics);
|
||||
|
||||
let got = delegate.route(request).await;
|
||||
assert_matches!(got, $want_result);
|
||||
|
@ -1074,4 +1125,130 @@ mod tests {
|
|||
want_dml_calls = []
|
||||
);
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
enum MockError {
|
||||
#[error("bad stuff")]
|
||||
Terrible,
|
||||
}
|
||||
|
||||
// This test ensures the request limiter drops requests once the configured
|
||||
// number of simultaneous requests are being serviced.
|
||||
#[tokio::test]
|
||||
async fn test_request_limit_enforced() {
|
||||
let dml_handler = Arc::new(MockDmlHandler::default());
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
let delegate = Arc::new(HttpDelegate::new(
|
||||
MAX_BYTES,
|
||||
1,
|
||||
Arc::clone(&dml_handler),
|
||||
&metrics,
|
||||
));
|
||||
|
||||
// Use a channel to hold open the request.
|
||||
//
|
||||
// This causes the request handler to block reading the request body
|
||||
// until tx is dropped and the body stream ends, completing the body and
|
||||
// unblocking the request handler.
|
||||
let (body_1_tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
let request_1 = Request::builder()
|
||||
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
|
||||
.method("POST")
|
||||
.body(Body::wrap_stream(ReceiverStream::new(rx)))
|
||||
.unwrap();
|
||||
|
||||
// Spawn the first request and push at least 2 body chunks through tx.
|
||||
//
|
||||
// Spawning and writing through tx will avoid any race between which
|
||||
// request handler task is scheduled first by ensuring this request is
|
||||
// being actively read from - the first send() could fill the channel
|
||||
// buffer of 1, and therefore successfully returning from the second
|
||||
// send() MUST indicate the stream is being read by the handler (and
|
||||
// therefore the task has spawned and the request is actively being
|
||||
// serviced).
|
||||
let req_1 = tokio::spawn({
|
||||
let delegate = Arc::clone(&delegate);
|
||||
async move { delegate.route(request_1).await }
|
||||
});
|
||||
body_1_tx
|
||||
.send(Ok("cpu "))
|
||||
.await
|
||||
.expect("req1 closed channel");
|
||||
body_1_tx
|
||||
.send(Ok("field=1i"))
|
||||
// Never hang if there is no handler reading this request
|
||||
.with_timeout_panic(Duration::from_secs(1))
|
||||
.await
|
||||
.expect("req1 closed channel");
|
||||
|
||||
//
|
||||
// At this point we can be certain that request 1 is being actively
|
||||
// serviced, and the HTTP server is in a state that should cause the
|
||||
// immediate drop of any subsequent requests.
|
||||
//
|
||||
|
||||
assert_metric_hit(&*metrics, "http_request_limit_rejected", Some(0));
|
||||
|
||||
// Retain this tx handle for the second request and use it to prove the
|
||||
// request dropped before anything was read from the body - the request
|
||||
// should error _before_ anything is sent over tx, and subsequently
|
||||
// attempting to send something over tx after the error should fail with
|
||||
// a "channel closed" error.
|
||||
let (body_2_tx, rx) = tokio::sync::mpsc::channel::<Result<&'static str, MockError>>(1);
|
||||
let request_2 = Request::builder()
|
||||
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
|
||||
.method("POST")
|
||||
.body(Body::wrap_stream(ReceiverStream::new(rx)))
|
||||
.unwrap();
|
||||
|
||||
// Attempt to service request 2.
|
||||
//
|
||||
// This should immediately return without requiring any body chunks to
|
||||
// be sent through tx.
|
||||
let err = delegate
|
||||
.route(request_2)
|
||||
.with_timeout_panic(Duration::from_secs(1))
|
||||
.await
|
||||
.expect_err("second request should be rejected");
|
||||
assert_matches!(err, Error::RequestLimit);
|
||||
|
||||
// Ensure the "rejected requests" metric was incremented
|
||||
assert_metric_hit(&*metrics, "http_request_limit_rejected", Some(1));
|
||||
|
||||
// Prove the dropped request body is not being read:
|
||||
body_2_tx
|
||||
.send(Ok("wat"))
|
||||
.await
|
||||
.expect_err("channel should be closed");
|
||||
|
||||
// Cause the first request handler to bail, releasing request capacity
|
||||
// back to the router.
|
||||
body_1_tx
|
||||
.send(Err(MockError::Terrible))
|
||||
.await
|
||||
.expect("req1 closed channel");
|
||||
// Wait for the handler to return to avoid any races.
|
||||
let req_1 = req_1
|
||||
.with_timeout_panic(Duration::from_secs(1))
|
||||
.await
|
||||
.expect("request 1 handler should not panic")
|
||||
.expect_err("request should fail");
|
||||
assert_matches!(req_1, Error::ClientHangup(_));
|
||||
|
||||
// And submit a third request that should be serviced now there's no
|
||||
// concurrent request being handled.
|
||||
let request_3 = Request::builder()
|
||||
.uri("https://bananas.example/api/v2/write?org=bananas&bucket=test")
|
||||
.method("POST")
|
||||
.body(Body::from(""))
|
||||
.unwrap();
|
||||
delegate
|
||||
.route(request_3)
|
||||
.with_timeout_panic(Duration::from_secs(1))
|
||||
.await
|
||||
.expect("empty write should succeed");
|
||||
|
||||
// And the request rejected metric must remain unchanged
|
||||
assert_metric_hit(&*metrics, "http_request_limit_rejected", Some(1));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -117,7 +117,7 @@ impl TestContext {
|
|||
|
||||
let handler_stack = InstrumentationDecorator::new("request", &*metrics, handler_stack);
|
||||
|
||||
let delegate = HttpDelegate::new(1024, Arc::new(handler_stack), &metrics);
|
||||
let delegate = HttpDelegate::new(1024, 100, Arc::new(handler_stack), &metrics);
|
||||
|
||||
Self {
|
||||
delegate,
|
||||
|
|
Loading…
Reference in New Issue