Merge pull request #16531 from influxdata/sgc/issue/5661
feat(http): Enforce limits in write handler and expose config optionspull/16539/head
commit
1a97dcf17a
|
@ -31,6 +31,18 @@ type APIBackend struct {
|
|||
// in a single points batch
|
||||
MaxBatchSizeBytes int64
|
||||
|
||||
// WriteParserMaxBytes specifies the maximum number of bytes that may be allocated when processing a single
|
||||
// write request. A value of zero specifies there is no limit.
|
||||
WriteParserMaxBytes int
|
||||
|
||||
// WriteParserMaxLines specifies the maximum number of lines that may be parsed when processing a single
|
||||
// write request. A value of zero specifies there is no limit.
|
||||
WriteParserMaxLines int
|
||||
|
||||
// WriteParserMaxValues specifies the maximum number of values that may be parsed when processing a single
|
||||
// write request. A value of zero specifies there is no limit.
|
||||
WriteParserMaxValues int
|
||||
|
||||
NewBucketService func(*influxdb.Source) (influxdb.BucketService, error)
|
||||
NewQueryService func(*influxdb.Source) (query.ProxyQueryService, error)
|
||||
|
||||
|
@ -202,7 +214,12 @@ func NewAPIHandler(b *APIBackend, opts ...APIHandlerOptFn) *APIHandler {
|
|||
h.Mount(prefixVariables, NewVariableHandler(b.Logger, variableBackend))
|
||||
|
||||
writeBackend := NewWriteBackend(b.Logger.With(zap.String("handler", "write")), b)
|
||||
h.Mount(prefixWrite, NewWriteHandler(b.Logger, writeBackend, WithMaxBatchSizeBytes(b.MaxBatchSizeBytes)))
|
||||
h.Mount(prefixWrite, NewWriteHandler(b.Logger, writeBackend,
|
||||
WithMaxBatchSizeBytes(b.MaxBatchSizeBytes),
|
||||
WithParserMaxBytes(b.WriteParserMaxBytes),
|
||||
WithParserMaxLines(b.WriteParserMaxLines),
|
||||
WithParserMaxValues(b.WriteParserMaxValues),
|
||||
))
|
||||
|
||||
for _, o := range opts {
|
||||
o(h)
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/httprouter"
|
||||
"github.com/influxdata/influxdb"
|
||||
|
@ -68,6 +67,10 @@ type WriteHandler struct {
|
|||
EventRecorder metric.EventRecorder
|
||||
|
||||
maxBatchSizeBytes int64
|
||||
parserOptions []models.ParserOption
|
||||
parserMaxBytes int
|
||||
parserMaxLines int
|
||||
parserMaxValues int
|
||||
}
|
||||
|
||||
// WriteHandlerOption is a functional option for a *WriteHandler
|
||||
|
@ -81,6 +84,30 @@ func WithMaxBatchSizeBytes(n int64) WriteHandlerOption {
|
|||
}
|
||||
}
|
||||
|
||||
// WithParserMaxBytes specifies the maximum number of bytes that may be allocated when processing a single
|
||||
// write request. When n is zero, there is no limit.
|
||||
func WithParserMaxBytes(n int) WriteHandlerOption {
|
||||
return func(w *WriteHandler) {
|
||||
w.parserMaxBytes = n
|
||||
}
|
||||
}
|
||||
|
||||
// WithParserMaxLines specifies the maximum number of lines that may be parsed when processing a single
|
||||
// write request. When n is zero, there is no limit.
|
||||
func WithParserMaxLines(n int) WriteHandlerOption {
|
||||
return func(w *WriteHandler) {
|
||||
w.parserMaxLines = n
|
||||
}
|
||||
}
|
||||
|
||||
// WithParserMaxValues specifies the maximum number of values that may be parsed when processing a single
|
||||
// write request. When n is zero, there is no limit.
|
||||
func WithParserMaxValues(n int) WriteHandlerOption {
|
||||
return func(w *WriteHandler) {
|
||||
w.parserMaxValues = n
|
||||
}
|
||||
}
|
||||
|
||||
// Prefix provides the route prefix.
|
||||
func (*WriteHandler) Prefix() string {
|
||||
return prefixWrite
|
||||
|
@ -109,6 +136,17 @@ func NewWriteHandler(log *zap.Logger, b *WriteBackend, opts ...WriteHandlerOptio
|
|||
opt(h)
|
||||
}
|
||||
|
||||
// cache configured options
|
||||
if h.parserMaxBytes > 0 {
|
||||
h.parserOptions = append(h.parserOptions, models.WithParserMaxBytes(h.parserMaxBytes))
|
||||
}
|
||||
if h.parserMaxLines > 0 {
|
||||
h.parserOptions = append(h.parserOptions, models.WithParserMaxLines(h.parserMaxLines))
|
||||
}
|
||||
if h.parserMaxValues > 0 {
|
||||
h.parserOptions = append(h.parserOptions, models.WithParserMaxValues(h.parserMaxValues))
|
||||
}
|
||||
|
||||
h.HandlerFunc("POST", prefixWrite, h.handleWrite)
|
||||
return h
|
||||
}
|
||||
|
@ -235,12 +273,31 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
|
|||
span, _ = tracing.StartSpanFromContextWithOperationName(ctx, "encoding and parsing")
|
||||
encoded := tsdb.EncodeName(org.ID, bucket.ID)
|
||||
mm := models.EscapeMeasurement(encoded[:])
|
||||
points, err := models.ParsePointsWithPrecision(data, mm, time.Now(), req.Precision)
|
||||
|
||||
var options []models.ParserOption
|
||||
if len(h.parserOptions) > 0 {
|
||||
options = make([]models.ParserOption, 0, len(h.parserOptions)+1)
|
||||
options = append(options, h.parserOptions...)
|
||||
}
|
||||
|
||||
if req.Precision != nil {
|
||||
options = append(options, req.Precision)
|
||||
}
|
||||
|
||||
points, err := models.ParsePointsWithOptions(data, mm, options...)
|
||||
span.LogKV("values_total", len(points))
|
||||
span.Finish()
|
||||
if err != nil {
|
||||
log.Error("Error parsing points", zap.Error(err))
|
||||
handleError(err, influxdb.EInvalid, "")
|
||||
|
||||
code := influxdb.EInvalid
|
||||
if errors.Is(err, models.ErrLimitMaxBytesExceeded) ||
|
||||
errors.Is(err, models.ErrLimitMaxLinesExceeded) ||
|
||||
errors.Is(err, models.ErrLimitMaxValuesExceeded) {
|
||||
code = influxdb.ETooLarge
|
||||
}
|
||||
|
||||
handleError(err, code, "")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -268,10 +325,15 @@ func decodeWriteRequest(ctx context.Context, r *http.Request) (*postWriteRequest
|
|||
}
|
||||
}
|
||||
|
||||
var precision models.ParserOption
|
||||
if p != "ns" {
|
||||
precision = models.WithParserPrecision(p)
|
||||
}
|
||||
|
||||
return &postWriteRequest{
|
||||
Bucket: qp.Get("bucket"),
|
||||
Org: qp.Get("org"),
|
||||
Precision: p,
|
||||
Precision: precision,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -311,7 +373,7 @@ func readWriteRequest(ctx context.Context, rc io.ReadCloser, encoding string, ma
|
|||
type postWriteRequest struct {
|
||||
Org string
|
||||
Bucket string
|
||||
Precision string
|
||||
Precision models.ParserOption
|
||||
}
|
||||
|
||||
// WriteService sends data over HTTP to influxdb via line protocol.
|
||||
|
|
|
@ -80,7 +80,7 @@ func TestWriteHandler_handleWrite(t *testing.T) {
|
|||
// state is the internal state of org and bucket services
|
||||
type state struct {
|
||||
org *influxdb.Organization // org to return in org service
|
||||
orgErr error // err to return in org servce
|
||||
orgErr error // err to return in org service
|
||||
bucket *influxdb.Bucket // bucket to return in bucket service
|
||||
bucketErr error // err to return in bucket service
|
||||
writeErr error // err to return from the points writer
|
||||
|
@ -274,6 +274,60 @@ func TestWriteHandler_handleWrite(t *testing.T) {
|
|||
body: `{"code":"request too large","message":"unable to read data: points batch is too large"}`,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "bytes limit rejected",
|
||||
request: request{
|
||||
org: "043e0780ee2b1000",
|
||||
bucket: "04504b356e23b000",
|
||||
body: "m1,t1=v1 f1=1",
|
||||
auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"),
|
||||
},
|
||||
state: state{
|
||||
org: testOrg("043e0780ee2b1000"),
|
||||
bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"),
|
||||
opts: []WriteHandlerOption{WithParserMaxBytes(5)},
|
||||
},
|
||||
wants: wants{
|
||||
code: 413,
|
||||
body: `{"code":"request too large","message":"points: number of allocated bytes exceeded"}`,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "lines limit rejected",
|
||||
request: request{
|
||||
org: "043e0780ee2b1000",
|
||||
bucket: "04504b356e23b000",
|
||||
body: "m1,t1=v1 f1=1\nm1,t1=v1 f1=1\nm1,t1=v1 f1=1\n",
|
||||
auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"),
|
||||
},
|
||||
state: state{
|
||||
org: testOrg("043e0780ee2b1000"),
|
||||
bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"),
|
||||
opts: []WriteHandlerOption{WithParserMaxLines(2)},
|
||||
},
|
||||
wants: wants{
|
||||
code: 413,
|
||||
body: `{"code":"request too large","message":"points: number of lines exceeded"}`,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "values limit rejected",
|
||||
request: request{
|
||||
org: "043e0780ee2b1000",
|
||||
bucket: "04504b356e23b000",
|
||||
body: "m1,t1=v1 f1=1,f2=2\nm1,t1=v1 f1=1,f2=2\nm1,t1=v1 f1=1,f2=2\n",
|
||||
auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"),
|
||||
},
|
||||
state: state{
|
||||
org: testOrg("043e0780ee2b1000"),
|
||||
bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"),
|
||||
opts: []WriteHandlerOption{WithParserMaxValues(4)},
|
||||
},
|
||||
wants: wants{
|
||||
code: 413,
|
||||
body: `{"code":"request too large","message":"points: number of values exceeded"}`,
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue