diff --git a/Cargo.lock b/Cargo.lock index 2a660261d4..1bc3098eef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4977,8 +4977,10 @@ dependencies = [ "serde", "serde_urlencoded", "siphasher", + "test_helpers", "thiserror", "tokio", + "tokio-stream", "tonic", "trace", "workspace-hack", diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 81a239f0a1..e5f1fb36c1 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -339,6 +339,7 @@ pub async fn command(config: Config) -> Result<()> { Arc::clone(&object_store), &write_buffer_config, query_pool_name, + 1_000, // max 1,000 concurrent HTTP requests ) .await?; diff --git a/influxdb_iox/src/commands/run/router2.rs b/influxdb_iox/src/commands/run/router2.rs index 9f198e8add..6fc4f63d1e 100644 --- a/influxdb_iox/src/commands/run/router2.rs +++ b/influxdb_iox/src/commands/run/router2.rs @@ -67,6 +67,22 @@ pub struct Config { default_value = "iox-shared" )] pub(crate) query_pool_name: String, + + /// The maximum number of simultaneous requests the HTTP server is + /// configured to accept. + /// + /// This number of requests, multiplied by the maximum request body size the + /// HTTP server is configured with gives the rough amount of memory a HTTP + /// server will use to buffer request bodies in memory. + /// + /// A default maximum of 200 requests, multiplied by the default 10MiB + /// maximum for HTTP request bodies == ~2GiB. + #[clap( + long = "--max-http-requests", + env = "INFLUXDB_IOX_MAX_HTTP_REQUESTS", + default_value = "200" + )] + pub(crate) http_request_limit: usize, } pub async fn command(config: Config) -> Result<()> { @@ -91,6 +107,7 @@ pub async fn command(config: Config) -> Result<()> { object_store, &config.write_buffer_config, &config.query_pool_name, + config.http_request_limit, ) .await?; diff --git a/ioxd_router2/src/lib.rs b/ioxd_router2/src/lib.rs index 5707f2a6a0..f1c9864d1f 100644 --- a/ioxd_router2/src/lib.rs +++ b/ioxd_router2/src/lib.rs @@ -151,6 +151,7 @@ pub async fn create_router2_server_type( object_store: Arc, write_buffer_config: &WriteBufferConfig, query_pool_name: &str, + request_limit: usize, ) -> Result> { // Initialise the sharded write buffer and instrument it with DML handler // metrics. @@ -265,6 +266,7 @@ pub async fn create_router2_server_type( let handler_stack = Arc::new(handler_stack); let http = HttpDelegate::new( common_state.run_config().max_http_request_size, + request_limit, Arc::clone(&handler_stack), &metrics, ); diff --git a/router2/Cargo.toml b/router2/Cargo.toml index 12888337ba..d06ec2c9a5 100644 --- a/router2/Cargo.toml +++ b/router2/Cargo.toml @@ -48,6 +48,8 @@ paste = "1.0.7" pretty_assertions = "1.2.1" rand = "0.8.3" schema = { path = "../schema" } +test_helpers = { version = "0.1.0", path = "../test_helpers", features = ["future_timeout"] } +tokio-stream = { version = "0.1.8", default_features = false, features = [] } [[bench]] name = "sharder" diff --git a/router2/benches/e2e.rs b/router2/benches/e2e.rs index 047f7cf460..65f802f130 100644 --- a/router2/benches/e2e.rs +++ b/router2/benches/e2e.rs @@ -71,7 +71,7 @@ fn e2e_benchmarks(c: &mut Criterion) { partitioner.and_then(WriteSummaryAdapter::new(FanOutAdaptor::new(write_buffer))), ); - HttpDelegate::new(1024, Arc::new(handler_stack), &metrics) + HttpDelegate::new(1024, 100, Arc::new(handler_stack), &metrics) }; let body_str = "platanos,tag1=A,tag2=B val=42i 123456"; diff --git a/router2/src/server/http.rs b/router2/src/server/http.rs index badf8c9b58..d530a97ae7 100644 --- a/router2/src/server/http.rs +++ b/router2/src/server/http.rs @@ -17,6 +17,7 @@ use observability_deps::tracing::*; use predicate::delete_predicate::{parse_delete_predicate, parse_http_delete_request}; use serde::Deserialize; use thiserror::Error; +use tokio::sync::{Semaphore, TryAcquireError}; use trace::ctx::SpanContext; use write_summary::WriteSummary; @@ -68,6 +69,11 @@ pub enum Error { /// An error returned from the [`DmlHandler`]. #[error("dml handler error: {0}")] DmlHandler(#[from] DmlError), + + /// The router is currently servicing the maximum permitted number of + /// simultaneous requests. + #[error("this service is overloaded, please try again later")] + RequestLimit, } impl Error { @@ -89,6 +95,7 @@ impl Error { StatusCode::UNSUPPORTED_MEDIA_TYPE } Error::DmlHandler(err) => StatusCode::from(err), + Error::RequestLimit => StatusCode::SERVICE_UNAVAILABLE, } } } @@ -200,17 +207,27 @@ impl TryFrom<&Request> for WriteInfo { /// Requests to some paths may be handled externally by the caller - the IOx /// server runner framework takes care of implementing the heath endpoint, /// metrics, pprof, etc. -#[derive(Debug, Default)] +#[derive(Debug)] pub struct HttpDelegate { max_request_bytes: usize, time_provider: T, dml_handler: Arc, + // A request limiter to restrict the number of simultaneous requests this + // router services. + // + // This allows the router to drop a portion of requests when experiencing an + // unusual flood of requests (i.e. due to peer routers crashing and + // depleting the available instances in the pool) in order to preserve + // overall system availability, instead of OOMing or otherwise failing. + request_sem: Semaphore, + write_metric_lines: U64Counter, write_metric_fields: U64Counter, write_metric_tables: U64Counter, write_metric_body_size: U64Counter, delete_metric_body_size: U64Counter, + request_limit_rejected: U64Counter, } impl HttpDelegate { @@ -219,7 +236,12 @@ impl HttpDelegate { /// /// HTTP request bodies are limited to `max_request_bytes` in size, /// returning an error if exceeded. - pub fn new(max_request_bytes: usize, dml_handler: Arc, metrics: &metric::Registry) -> Self { + pub fn new( + max_request_bytes: usize, + max_requests: usize, + dml_handler: Arc, + metrics: &metric::Registry, + ) -> Self { let write_metric_lines = metrics .register_metric::( "http_write_lines_total", @@ -250,16 +272,24 @@ impl HttpDelegate { "cumulative byte size of successfully routed (decompressed) delete requests", ) .recorder(&[]); + let request_limit_rejected = metrics + .register_metric::( + "http_request_limit_rejected", + "number of HTTP requests rejected due to exceeding parallel request limit", + ) + .recorder(&[]); Self { max_request_bytes, time_provider: SystemProvider::default(), dml_handler, + request_sem: Semaphore::new(max_requests), write_metric_lines, write_metric_fields, write_metric_tables, write_metric_body_size, delete_metric_body_size, + request_limit_rejected, } } } @@ -272,6 +302,24 @@ where /// Routes `req` to the appropriate handler, if any, returning the handler /// response. pub async fn route(&self, req: Request) -> Result, Error> { + // Acquire and hold a permit for the duration of this request, or return + // a 503 if the existing requests have already exhausted the allocation. + // + // By dropping requests at the routing stage, before the request buffer + // is read/decompressed, this limit can efficiently shed load to avoid + // unnecessary memory pressure (the resource this request limit usually + // aims to protect.) + let _permit = match self.request_sem.try_acquire() { + Ok(p) => p, + Err(TryAcquireError::NoPermits) => { + error!("simultaneous request limit exceeded - dropping request"); + self.request_limit_rejected.inc(1); + return Err(Error::RequestLimit); + } + Err(e) => panic!("request limiter error: {}", e), + }; + + // Route the request to a handler. match (req.method(), req.uri().path()) { (&Method::POST, "/api/v2/write") => self.write_handler(req).await, (&Method::POST, "/api/v2/delete") => self.delete_handler(req).await, @@ -451,7 +499,7 @@ where #[cfg(test)] mod tests { - use std::{io::Write, iter, sync::Arc}; + use std::{io::Write, iter, sync::Arc, time::Duration}; use assert_matches::assert_matches; @@ -460,6 +508,8 @@ mod tests { use metric::{Attributes, Metric}; use mutable_batch::column::ColumnData; use mutable_batch_lp::LineWriteError; + use test_helpers::timeout::FutureTimeout; + use tokio_stream::wrappers::ReceiverStream; use crate::dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall}; @@ -479,9 +529,10 @@ mod tests { .expect("failed to get observer") .fetch(); - assert!(counter > 0, "metric {} did not record any values", name); if let Some(want) = value { assert_eq!(want, counter, "metric does not have expected value"); + } else { + assert!(counter > 0, "metric {} did not record any values", name); } } @@ -556,7 +607,7 @@ mod tests { .with_delete_return($dml_delete_handler) ); let metrics = Arc::new(metric::Registry::default()); - let delegate = HttpDelegate::new(MAX_BYTES, Arc::clone(&dml_handler), &metrics); + let delegate = HttpDelegate::new(MAX_BYTES, 100, Arc::clone(&dml_handler), &metrics); let got = delegate.route(request).await; assert_matches!(got, $want_result); @@ -1074,4 +1125,130 @@ mod tests { want_dml_calls = [] ); } + + #[derive(Debug, Error)] + enum MockError { + #[error("bad stuff")] + Terrible, + } + + // This test ensures the request limiter drops requests once the configured + // number of simultaneous requests are being serviced. + #[tokio::test] + async fn test_request_limit_enforced() { + let dml_handler = Arc::new(MockDmlHandler::default()); + let metrics = Arc::new(metric::Registry::default()); + let delegate = Arc::new(HttpDelegate::new( + MAX_BYTES, + 1, + Arc::clone(&dml_handler), + &metrics, + )); + + // Use a channel to hold open the request. + // + // This causes the request handler to block reading the request body + // until tx is dropped and the body stream ends, completing the body and + // unblocking the request handler. + let (body_1_tx, rx) = tokio::sync::mpsc::channel(1); + let request_1 = Request::builder() + .uri("https://bananas.example/api/v2/write?org=bananas&bucket=test") + .method("POST") + .body(Body::wrap_stream(ReceiverStream::new(rx))) + .unwrap(); + + // Spawn the first request and push at least 2 body chunks through tx. + // + // Spawning and writing through tx will avoid any race between which + // request handler task is scheduled first by ensuring this request is + // being actively read from - the first send() could fill the channel + // buffer of 1, and therefore successfully returning from the second + // send() MUST indicate the stream is being read by the handler (and + // therefore the task has spawned and the request is actively being + // serviced). + let req_1 = tokio::spawn({ + let delegate = Arc::clone(&delegate); + async move { delegate.route(request_1).await } + }); + body_1_tx + .send(Ok("cpu ")) + .await + .expect("req1 closed channel"); + body_1_tx + .send(Ok("field=1i")) + // Never hang if there is no handler reading this request + .with_timeout_panic(Duration::from_secs(1)) + .await + .expect("req1 closed channel"); + + // + // At this point we can be certain that request 1 is being actively + // serviced, and the HTTP server is in a state that should cause the + // immediate drop of any subsequent requests. + // + + assert_metric_hit(&*metrics, "http_request_limit_rejected", Some(0)); + + // Retain this tx handle for the second request and use it to prove the + // request dropped before anything was read from the body - the request + // should error _before_ anything is sent over tx, and subsequently + // attempting to send something over tx after the error should fail with + // a "channel closed" error. + let (body_2_tx, rx) = tokio::sync::mpsc::channel::>(1); + let request_2 = Request::builder() + .uri("https://bananas.example/api/v2/write?org=bananas&bucket=test") + .method("POST") + .body(Body::wrap_stream(ReceiverStream::new(rx))) + .unwrap(); + + // Attempt to service request 2. + // + // This should immediately return without requiring any body chunks to + // be sent through tx. + let err = delegate + .route(request_2) + .with_timeout_panic(Duration::from_secs(1)) + .await + .expect_err("second request should be rejected"); + assert_matches!(err, Error::RequestLimit); + + // Ensure the "rejected requests" metric was incremented + assert_metric_hit(&*metrics, "http_request_limit_rejected", Some(1)); + + // Prove the dropped request body is not being read: + body_2_tx + .send(Ok("wat")) + .await + .expect_err("channel should be closed"); + + // Cause the first request handler to bail, releasing request capacity + // back to the router. + body_1_tx + .send(Err(MockError::Terrible)) + .await + .expect("req1 closed channel"); + // Wait for the handler to return to avoid any races. + let req_1 = req_1 + .with_timeout_panic(Duration::from_secs(1)) + .await + .expect("request 1 handler should not panic") + .expect_err("request should fail"); + assert_matches!(req_1, Error::ClientHangup(_)); + + // And submit a third request that should be serviced now there's no + // concurrent request being handled. + let request_3 = Request::builder() + .uri("https://bananas.example/api/v2/write?org=bananas&bucket=test") + .method("POST") + .body(Body::from("")) + .unwrap(); + delegate + .route(request_3) + .with_timeout_panic(Duration::from_secs(1)) + .await + .expect("empty write should succeed"); + + // And the request rejected metric must remain unchanged + assert_metric_hit(&*metrics, "http_request_limit_rejected", Some(1)); + } } diff --git a/router2/tests/http.rs b/router2/tests/http.rs index 51a8b28a90..5a4248bf9d 100644 --- a/router2/tests/http.rs +++ b/router2/tests/http.rs @@ -117,7 +117,7 @@ impl TestContext { let handler_stack = InstrumentationDecorator::new("request", &*metrics, handler_stack); - let delegate = HttpDelegate::new(1024, Arc::new(handler_stack), &metrics); + let delegate = HttpDelegate::new(1024, 100, Arc::new(handler_stack), &metrics); Self { delegate,