feat(http): add configurable limit to points batch size on write endpoint (#16469)

pull/16497/head
George 2020-01-10 11:02:44 +00:00 committed by GitHub
parent 63b3a07281
commit a0c18c9ef7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 129 additions and 45 deletions

View File

@ -36,6 +36,7 @@
1. [16430](https://github.com/influxdata/influxdb/pull/16430): Added toggle to table thresholds to allow users to choose between setting threshold colors to text or background
1. [16418](https://github.com/influxdata/influxdb/pull/16418): Add Developer Documentation
1. [16260](https://github.com/influxdata/influxdb/pull/16260): Capture User-Agent header as query source for logging purposes
1. [16469](https://github.com/influxdata/influxdb/pull/16469): Add support for configurable max batch size in points write handler
### Bug Fixes

View File

@ -24,6 +24,7 @@ const (
ETooManyRequests = "too many requests"
EUnauthorized = "unauthorized"
EMethodNotAllowed = "method not allowed"
ETooLarge = "request too large"
)
// Error is the error struct of platform.

View File

@ -157,4 +157,5 @@ var statusCodePlatformError = map[string]int{
platform.ETooManyRequests: http.StatusTooManyRequests,
platform.EUnauthorized: http.StatusUnauthorized,
platform.EMethodNotAllowed: http.StatusMethodNotAllowed,
platform.ETooLarge: http.StatusRequestEntityTooLarge,
}

View File

@ -3,6 +3,7 @@ package http
import (
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
@ -21,6 +22,13 @@ import (
"go.uber.org/zap"
)
var (
// ErrMaxBatchSizeExceeded is returned when a points batch exceeds
// the defined upper limit in bytes. This pertains to the size of the
// batch after inflation from any compression (i.e. ungzipped).
ErrMaxBatchSizeExceeded = errors.New("points batch is too large")
)
// WriteBackend is all services and associated parameters required to construct
// the WriteHandler.
type WriteBackend struct {
@ -58,6 +66,19 @@ type WriteHandler struct {
PointsWriter storage.PointsWriter
EventRecorder metric.EventRecorder
maxBatchSizeBytes int64
}
// WriteHandlerOption is a functional option for a *WriteHandler
type WriteHandlerOption func(*WriteHandler)
// WithMaxBatchSizeBytes configures the maximum size for a
// (decompressed) points batch allowed by the write handler
func WithMaxBatchSizeBytes(n int64) WriteHandlerOption {
return func(w *WriteHandler) {
w.maxBatchSizeBytes = n
}
}
// Prefix provides the route prefix.
@ -72,7 +93,7 @@ const (
)
// NewWriteHandler creates a new handler at /api/v2/write to receive line protocol.
func NewWriteHandler(log *zap.Logger, b *WriteBackend) *WriteHandler {
func NewWriteHandler(log *zap.Logger, b *WriteBackend, opts ...WriteHandlerOption) *WriteHandler {
h := &WriteHandler{
Router: NewRouter(b.HTTPErrorHandler),
HTTPErrorHandler: b.HTTPErrorHandler,
@ -84,6 +105,10 @@ func NewWriteHandler(log *zap.Logger, b *WriteBackend) *WriteHandler {
EventRecorder: b.WriteEventRecorder,
}
for _, opt := range opts {
opt(h)
}
h.HandlerFunc("POST", prefixWrite, h.handleWrite)
return h
}
@ -97,9 +122,19 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
// TODO(desa): I really don't like how we're recording the usage metrics here
// Ideally this will be moved when we solve https://github.com/influxdata/influxdb/issues/13403
var orgID influxdb.ID
var requestBytes int
sw := kithttp.NewStatusResponseWriter(w)
var (
orgID influxdb.ID
requestBytes int
sw = kithttp.NewStatusResponseWriter(w)
handleError = func(err error, code, message string) {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: code,
Op: "http/handleWrite",
Msg: message,
Err: err,
}, w)
}
)
w = sw
defer func() {
h.EventRecorder.Record(ctx, metric.Event{
@ -111,20 +146,22 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
})
}()
in := r.Body
var in io.ReadCloser = r.Body
defer in.Close()
if r.Header.Get("Content-Encoding") == "gzip" {
var err error
in, err = gzip.NewReader(r.Body)
if err != nil {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInvalid,
Op: "http/handleWrite",
Msg: errInvalidGzipHeader,
Err: err,
}, w)
handleError(err, influxdb.EInvalid, errInvalidGzipHeader)
return
}
defer in.Close()
}
// given a limit is configured on the number of bytes in a
// batch then wrap the reader in a limited reader
if h.maxBatchSizeBytes > 0 {
in = newLimitedReadCloser(in, h.maxBatchSizeBytes)
}
a, err := pcontext.GetAuthorizer(ctx)
@ -183,21 +220,12 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
p, err := influxdb.NewPermissionAtID(bucket.ID, influxdb.WriteAction, influxdb.BucketsResourceType, org.ID)
if err != nil {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInternal,
Op: "http/handleWrite",
Msg: fmt.Sprintf("unable to create permission for bucket: %v", err),
Err: err,
}, w)
handleError(err, influxdb.EInternal, fmt.Sprintf("unable to create permission for bucket: %v", err))
return
}
if !a.Allowed(*p) {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EForbidden,
Op: "http/handleWrite",
Msg: "insufficient permissions for write",
}, w)
handleError(err, influxdb.EForbidden, "insufficient permissions for write")
return
}
@ -210,22 +238,28 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
span.Finish()
if err != nil {
log.Error("Error reading body", zap.Error(err))
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInternal,
Op: "http/handleWrite",
Msg: fmt.Sprintf("unable to read data: %v", err),
Err: err,
}, w)
handleError(err, influxdb.EInternal, "unable to read data")
return
}
// close the reader now that all bytes have been consumed
// this will return non-nil in the case of a configured limit
// being exceeded
if err := in.Close(); err != nil {
log.Error("Error reading body", zap.Error(err))
code := influxdb.EInternal
if errors.Is(err, ErrMaxBatchSizeExceeded) {
code = influxdb.ETooLarge
}
handleError(err, code, "unable to read data")
return
}
requestBytes = len(data)
if requestBytes == 0 {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInvalid,
Op: "http/handleWrite",
Msg: "writing requires points",
}, w)
handleError(err, influxdb.EInvalid, "writing requires points")
return
}
@ -237,21 +271,13 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
span.Finish()
if err != nil {
log.Error("Error parsing points", zap.Error(err))
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: err.Error(),
}, w)
handleError(err, influxdb.EInvalid, "")
return
}
if err := h.PointsWriter.WritePoints(ctx, points); err != nil {
log.Error("Error writing points", zap.Error(err))
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInternal,
Op: "http/handleWrite",
Msg: "unexpected error writing points to database",
Err: err,
}, w)
handleError(err, influxdb.EInternal, "unexpected error writing points to database")
return
}
@ -369,3 +395,39 @@ func compressWithGzip(data io.Reader) (io.Reader, error) {
return pr, err
}
type limitedReader struct {
*io.LimitedReader
err error
close func() error
}
func newLimitedReadCloser(r io.ReadCloser, n int64) *limitedReader {
// read up to max + 1 as limited reader just returns EOF when the limit is reached
// or when there is nothing left to read. If we exceed the max batch size by one
// then we know the limit has been passed.
return &limitedReader{
LimitedReader: &io.LimitedReader{R: r, N: n + 1},
close: r.Close,
}
}
// Close returns an ErrMaxBatchSizeExceeded when the wrapped reader
// exceeds the set limit for number of bytes.
// This is safe to call more than once but not concurrently.
func (l *limitedReader) Close() (err error) {
defer func() {
if cerr := l.close(); cerr != nil && err == nil {
err = cerr
}
// only call close once
l.close = func() error { return nil }
}()
if l.N < 1 {
l.err = ErrMaxBatchSizeExceeded
}
return l.err
}

View File

@ -84,6 +84,7 @@ func TestWriteHandler_handleWrite(t *testing.T) {
bucket *influxdb.Bucket // bucket to return in bucket service
bucketErr error // err to return in bucket service
writeErr error // err to return from the points writer
opts []WriteHandlerOption // write handle configured options
}
// want is the expected output of the HTTP endpoint
@ -255,6 +256,24 @@ func TestWriteHandler_handleWrite(t *testing.T) {
body: `{"code":"internal error","message":"authorizer not found on context"}`,
},
},
{
name: "large requests rejected",
request: request{
org: "043e0780ee2b1000",
bucket: "04504b356e23b000",
body: "m1,t1=v1 f1=1",
auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"),
},
state: state{
org: testOrg("043e0780ee2b1000"),
bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"),
opts: []WriteHandlerOption{WithMaxBatchSizeBytes(5)},
},
wants: wants{
code: 413,
body: `{"code":"request too large","message":"unable to read data: points batch is too large"}`,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -275,7 +294,7 @@ func TestWriteHandler_handleWrite(t *testing.T) {
PointsWriter: &mock.PointsWriter{Err: tt.state.writeErr},
WriteEventRecorder: &metric.NopEventRecorder{},
}
writeHandler := NewWriteHandler(zaptest.NewLogger(t), NewWriteBackend(zaptest.NewLogger(t), b))
writeHandler := NewWriteHandler(zaptest.NewLogger(t), NewWriteBackend(zaptest.NewLogger(t), b), tt.state.opts...)
handler := httpmock.NewAuthMiddlewareHandler(writeHandler, tt.request.auth)
r := httptest.NewRequest(