From 560cd3faee50c8433615ff7ff4d8d7857dc54b00 Mon Sep 17 00:00:00 2001 From: George Date: Fri, 10 Jan 2020 16:00:37 +0000 Subject: [PATCH] fix(http): expose max batch size bytes limit open on *APIBackend (#16497) --- http/api_handler.go | 5 +++- http/write_handler.go | 70 ++++++++++++++++++++++--------------------- 2 files changed, 40 insertions(+), 35 deletions(-) diff --git a/http/api_handler.go b/http/api_handler.go index b2704607ed..c2029f987f 100644 --- a/http/api_handler.go +++ b/http/api_handler.go @@ -27,6 +27,9 @@ type APIBackend struct { Logger *zap.Logger influxdb.HTTPErrorHandler SessionRenewDisabled bool + // MaxBatchSizeBytes is the maximum number of bytes which can be written + // in a single points batch + MaxBatchSizeBytes int64 NewBucketService func(*influxdb.Source) (influxdb.BucketService, error) NewQueryService func(*influxdb.Source) (query.ProxyQueryService, error) @@ -199,7 +202,7 @@ func NewAPIHandler(b *APIBackend, opts ...APIHandlerOptFn) *APIHandler { h.Mount(prefixVariables, NewVariableHandler(b.Logger, variableBackend)) writeBackend := NewWriteBackend(b.Logger.With(zap.String("handler", "write")), b) - h.Mount(prefixWrite, NewWriteHandler(b.Logger, writeBackend)) + h.Mount(prefixWrite, NewWriteHandler(b.Logger, writeBackend, WithMaxBatchSizeBytes(b.MaxBatchSizeBytes))) for _, o := range opts { o(h) diff --git a/http/write_handler.go b/http/write_handler.go index 534c2f343f..2d22317f64 100644 --- a/http/write_handler.go +++ b/http/write_handler.go @@ -146,24 +146,6 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { }) }() - var in io.ReadCloser = r.Body - defer in.Close() - - if r.Header.Get("Content-Encoding") == "gzip" { - var err error - in, err = gzip.NewReader(r.Body) - if err != nil { - handleError(err, influxdb.EInvalid, errInvalidGzipHeader) - return - } - } - - // given a limit is configured on the number of bytes in a - // batch then wrap the reader in a limited reader - if h.maxBatchSizeBytes > 0 { - in = newLimitedReadCloser(in, h.maxBatchSizeBytes) - } - a, err := pcontext.GetAuthorizer(ctx) if err != nil { h.HandleHTTPError(ctx, err, w) @@ -229,28 +211,15 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { return } - // TODO(jeff): we should be publishing with the org and bucket instead of - // parsing, rewriting, and publishing, but the interface isn't quite there yet. - // be sure to remove this when it is there! - span, _ = tracing.StartSpanFromContextWithOperationName(ctx, "read request body") - data, err := ioutil.ReadAll(in) - span.LogKV("request_bytes", len(data)) - span.Finish() + data, err := readWriteRequest(ctx, r.Body, r.Header.Get("Content-Encoding"), h.maxBatchSizeBytes) if err != nil { log.Error("Error reading body", zap.Error(err)) - handleError(err, influxdb.EInternal, "unable to read data") - return - } - - // close the reader now that all bytes have been consumed - // this will return non-nil in the case of a configured limit - // being exceeded - if err := in.Close(); err != nil { - log.Error("Error reading body", zap.Error(err)) code := influxdb.EInternal if errors.Is(err, ErrMaxBatchSizeExceeded) { code = influxdb.ETooLarge + } else if errors.Is(err, gzip.ErrHeader) || errors.Is(err, gzip.ErrChecksum) { + code = influxdb.EInvalid } handleError(err, code, "unable to read data") @@ -306,6 +275,39 @@ func decodeWriteRequest(ctx context.Context, r *http.Request) (*postWriteRequest }, nil } +func readWriteRequest(ctx context.Context, rc io.ReadCloser, encoding string, maxBatchSizeBytes int64) (v []byte, err error) { + defer func() { + // close the reader now that all bytes have been consumed + // this will return non-nil in the case of a configured limit + // being exceeded + if cerr := rc.Close(); err == nil { + err = cerr + } + }() + + switch encoding { + case "gzip", "x-gzip": + rc, err = gzip.NewReader(rc) + if err != nil { + return nil, err + } + } + + // given a limit is configured on the number of bytes in a + // batch then wrap the reader in a limited reader + if maxBatchSizeBytes > 0 { + rc = newLimitedReadCloser(rc, maxBatchSizeBytes) + } + + span, _ := tracing.StartSpanFromContextWithOperationName(ctx, "read request body") + defer func() { + span.LogKV("request_bytes", len(v)) + span.Finish() + }() + + return ioutil.ReadAll(rc) +} + type postWriteRequest struct { Org string Bucket string