Add http write throttling.

This commit adds throttling to the HTTP write endpoints based on
queue depth and, optionally, timeout. Two queues exist: `enqueued`
and `current`. The `current` queue is the number of concurrent
requests that can be processed. The `enqueued` queue limits the
maximum number of requests that can be waiting to be processed.

If the timeout is exceeded or the `enqueued` queue is full then
a `"503 Service unavailable"` code is returned and the error is
logged.

By default these options are turned off.
pull/9877/head
Ben Johnson 2018-05-18 14:16:18 -06:00
parent b5f3ec6372
commit 8a74c6759f
No known key found for this signature in database
GPG Key ID: 81741CD251883081
4 changed files with 221 additions and 20 deletions

View File

@ -272,6 +272,18 @@
# The maximum size of a client request body, in bytes. Setting this value to 0 disables the limit.
# max-body-size = 25000000
# The maximum number of writes processed concurrently.
# Setting this to 0 disables the limit.
# max-concurrent-write-limit = 0
# The maximum number of writes queued for processing.
# Setting this to 0 disables the limit.
# max-enqueued-write-limit = 0
# The maximum duration for a write to wait in the queue to be processed.
# Setting this to 0 or setting max-concurrent-write-limit to 0 disables the limit.
# enqueued-write-timeout = 0
###
### [ifql]

View File

@ -1,6 +1,8 @@
package httpd
import (
"time"
"github.com/influxdata/influxdb/monitor/diagnostics"
"github.com/influxdata/influxdb/toml"
)
@ -17,30 +19,36 @@ const (
// DefaultMaxBodySize is the default maximum size of a client request body, in bytes. Specify 0 for no limit.
DefaultMaxBodySize = 25e6
// DefaultEnqueuedWriteTimeout is the maximum time a write request can wait to be processed.
DefaultEnqueuedWriteTimeout = 30 * time.Second
)
// Config represents a configuration for a HTTP service.
type Config struct {
Enabled bool `toml:"enabled"`
BindAddress string `toml:"bind-address"`
AuthEnabled bool `toml:"auth-enabled"`
LogEnabled bool `toml:"log-enabled"`
SuppressWriteLog bool `toml:"suppress-write-log"`
WriteTracing bool `toml:"write-tracing"`
PprofEnabled bool `toml:"pprof-enabled"`
HTTPSEnabled bool `toml:"https-enabled"`
HTTPSCertificate string `toml:"https-certificate"`
HTTPSPrivateKey string `toml:"https-private-key"`
MaxRowLimit int `toml:"max-row-limit"`
MaxConnectionLimit int `toml:"max-connection-limit"`
SharedSecret string `toml:"shared-secret"`
Realm string `toml:"realm"`
UnixSocketEnabled bool `toml:"unix-socket-enabled"`
UnixSocketGroup *toml.Group `toml:"unix-socket-group"`
UnixSocketPermissions toml.FileMode `toml:"unix-socket-permissions"`
BindSocket string `toml:"bind-socket"`
MaxBodySize int `toml:"max-body-size"`
AccessLogPath string `toml:"access-log-path"`
Enabled bool `toml:"enabled"`
BindAddress string `toml:"bind-address"`
AuthEnabled bool `toml:"auth-enabled"`
LogEnabled bool `toml:"log-enabled"`
SuppressWriteLog bool `toml:"suppress-write-log"`
WriteTracing bool `toml:"write-tracing"`
PprofEnabled bool `toml:"pprof-enabled"`
HTTPSEnabled bool `toml:"https-enabled"`
HTTPSCertificate string `toml:"https-certificate"`
HTTPSPrivateKey string `toml:"https-private-key"`
MaxRowLimit int `toml:"max-row-limit"`
MaxConnectionLimit int `toml:"max-connection-limit"`
SharedSecret string `toml:"shared-secret"`
Realm string `toml:"realm"`
UnixSocketEnabled bool `toml:"unix-socket-enabled"`
UnixSocketGroup *toml.Group `toml:"unix-socket-group"`
UnixSocketPermissions toml.FileMode `toml:"unix-socket-permissions"`
BindSocket string `toml:"bind-socket"`
MaxBodySize int `toml:"max-body-size"`
AccessLogPath string `toml:"access-log-path"`
MaxConcurrentWriteLimit int `toml:"max-concurrent-write-limit"`
MaxEnqueuedWriteLimit int `toml:"max-enqueued-write-limit"`
EnqueuedWriteTimeout time.Duration `toml:"enqueued-write-timeout"`
}
// NewConfig returns a new Config with default settings.
@ -58,6 +66,7 @@ func NewConfig() Config {
UnixSocketPermissions: 0777,
BindSocket: DefaultBindSocket,
MaxBodySize: DefaultMaxBodySize,
EnqueuedWriteTimeout: DefaultEnqueuedWriteTimeout,
}
}

View File

@ -115,6 +115,7 @@ type Handler struct {
stats *Statistics
requestTracker *RequestTracker
writeThrottler *Throttler
}
// NewHandler returns a new instance of handler with routes.
@ -128,6 +129,10 @@ func NewHandler(c Config) *Handler {
requestTracker: NewRequestTracker(),
}
// Limit the number of concurrent & enqueued write requests.
h.writeThrottler = NewThrottler(c.MaxConcurrentWriteLimit, c.MaxEnqueuedWriteLimit)
h.writeThrottler.EnqueueTimeout = c.EnqueuedWriteTimeout
// Disable the write log if they have been suppressed.
writeLogEnabled := c.LogEnabled
if c.SuppressWriteLog {
@ -285,6 +290,15 @@ func (h *Handler) AddRoutes(routes ...Route) {
handler = http.HandlerFunc(hf)
}
// Throttle route if this is a write endpoint.
if r.Method == http.MethodPost {
switch r.Pattern {
case "/write", "/api/v1/prom/write":
handler = h.writeThrottler.Handler(handler)
default:
}
}
handler = h.responseWriter(handler)
if r.Gzipped {
handler = gzipFilter(handler)
@ -1647,3 +1661,71 @@ func (r *Response) Error() error {
}
return nil
}
// Throttler represents an HTTP throttler that limits the number of concurrent
// requests being processed as well as the number of enqueued requests.
type Throttler struct {
current chan struct{}
enqueued chan struct{}
// Maximum amount of time requests can wait in queue.
// Must be set before adding middleware.
EnqueueTimeout time.Duration
Logger *zap.Logger
}
// NewThrottler returns a new instance of Throttler that limits to concurrentN.
// requests processed at a time and maxEnqueueN requests waiting to be processed.
func NewThrottler(concurrentN, maxEnqueueN int) *Throttler {
return &Throttler{
current: make(chan struct{}, concurrentN),
enqueued: make(chan struct{}, concurrentN+maxEnqueueN),
Logger: zap.NewNop(),
}
}
// Handler wraps h in a middleware handler that throttles requests.
func (t *Throttler) Handler(h http.Handler) http.Handler {
timeout := t.EnqueueTimeout
// Return original handler if concurrent requests is zero.
if cap(t.current) == 0 {
return h
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Start a timer to limit enqueued request times.
var timerCh <-chan time.Time
if timeout > 0 {
timer := time.NewTimer(timeout)
defer timer.Stop()
timerCh = timer.C
}
// Wait for a spot in the queue.
if cap(t.enqueued) > cap(t.current) {
select {
case t.enqueued <- struct{}{}:
defer func() { <-t.enqueued }()
default:
t.Logger.Warn("request throttled, queue full", zap.Duration("d", timeout))
http.Error(w, "request throttled, queue full", http.StatusServiceUnavailable)
return
}
}
// Wait for a spot in the list of concurrent requests.
select {
case t.current <- struct{}{}:
case <-timerCh:
t.Logger.Warn("request throttled, exceeds timeout", zap.Duration("d", timeout))
http.Error(w, "request throttled, exceeds timeout", http.StatusServiceUnavailable)
return
}
defer func() { <-t.current }()
// Execute request.
h.ServeHTTP(w, r)
})
}

View File

@ -11,8 +11,10 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"os"
"reflect"
"strings"
"sync/atomic"
"testing"
"time"
@ -926,6 +928,102 @@ func TestHandler_XRequestId(t *testing.T) {
}
}
func TestThrottler_Handler(t *testing.T) {
t.Run("OK", func(t *testing.T) {
throttler := httpd.NewThrottler(2, 98)
// Send the total number of concurrent requests to the channel.
var concurrentN int32
concurrentCh := make(chan int)
h := throttler.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(&concurrentN, 1)
concurrentCh <- int(atomic.LoadInt32(&concurrentN))
time.Sleep(1 * time.Millisecond)
atomic.AddInt32(&concurrentN, -1)
}))
// Execute requests concurrently.
const n = 100
for i := 0; i < n; i++ {
go func() { h.ServeHTTP(nil, nil) }()
}
// Read the number of concurrent requests for every execution.
for i := 0; i < n; i++ {
if v := <-concurrentCh; v > 2 {
t.Fatalf("concurrent requests exceed maximum: %d", v)
}
}
})
t.Run("ErrTimeout", func(t *testing.T) {
throttler := httpd.NewThrottler(2, 1)
throttler.EnqueueTimeout = 1 * time.Millisecond
resp := make(chan struct{})
h := throttler.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp <- struct{}{}
}))
pending := make(chan struct{}, 2)
// First two requests should execute immediately.
go func() { pending <- struct{}{}; h.ServeHTTP(nil, nil) }()
go func() { pending <- struct{}{}; h.ServeHTTP(nil, nil) }()
<-pending
<-pending
// Third request should be enqueued but timeout.
w := httptest.NewRecorder()
h.ServeHTTP(w, nil)
if w.Code != http.StatusServiceUnavailable {
t.Fatalf("unexpected status code: %d", w.Code)
} else if body := w.Body.String(); body != "request throttled, exceeds timeout\n" {
t.Fatalf("unexpected response body: %q", body)
}
// Allow 2 existing requests to complete.
<-resp
<-resp
})
t.Run("ErrFull", func(t *testing.T) {
delay := 100 * time.Millisecond
if os.Getenv("CI") != "" {
delay = 2 * time.Second
}
throttler := httpd.NewThrottler(2, 1)
resp := make(chan struct{})
h := throttler.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp <- struct{}{}
}))
// First two requests should execute immediately and third should be queued.
go func() { h.ServeHTTP(nil, nil) }()
go func() { h.ServeHTTP(nil, nil) }()
go func() { h.ServeHTTP(nil, nil) }()
time.Sleep(delay)
// Fourth request should fail when trying to enqueue.
w := httptest.NewRecorder()
h.ServeHTTP(w, nil)
if w.Code != http.StatusServiceUnavailable {
t.Fatalf("unexpected status code: %d", w.Code)
} else if body := w.Body.String(); body != "request throttled, queue full\n" {
t.Fatalf("unexpected response body: %q", body)
}
// Allow 3 existing requests to complete.
<-resp
<-resp
<-resp
})
}
// NewHandler represents a test wrapper for httpd.Handler.
type Handler struct {
*httpd.Handler