From bb72fdeae02afa71d96a9617f3c92928a65db9d4 Mon Sep 17 00:00:00 2001 From: Chris Goller Date: Thu, 12 Sep 2019 13:39:24 -0500 Subject: [PATCH] test(http): add unit tests for /api/v2/write We have been tracking down odd error messages when writing data and found the problem to be internal server errors when writing empty bodies. I added fairly comprehensive test coverage for /api/v2/write as well as simplify and clarify the error messages. --- context/token.go | 30 +-- http/authentication_middleware.go | 12 +- http/metric/recorder.go | 6 + http/requests.go | 6 + http/session_handler.go | 6 +- http/write_handler.go | 90 ++++----- http/write_handler_test.go | 299 +++++++++++++++++++++++++++++- 7 files changed, 379 insertions(+), 70 deletions(-) diff --git a/context/token.go b/context/token.go index 9297993f5a..352e51f2bd 100644 --- a/context/token.go +++ b/context/token.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb" ) type contextKey string @@ -14,17 +14,23 @@ const ( ) // SetAuthorizer sets an authorizer on context. -func SetAuthorizer(ctx context.Context, a platform.Authorizer) context.Context { +func SetAuthorizer(ctx context.Context, a influxdb.Authorizer) context.Context { return context.WithValue(ctx, authorizerCtxKey, a) } // GetAuthorizer retrieves an authorizer from context. -func GetAuthorizer(ctx context.Context) (platform.Authorizer, error) { - a, ok := ctx.Value(authorizerCtxKey).(platform.Authorizer) +func GetAuthorizer(ctx context.Context) (influxdb.Authorizer, error) { + a, ok := ctx.Value(authorizerCtxKey).(influxdb.Authorizer) if !ok { - return nil, &platform.Error{ + return nil, &influxdb.Error{ Msg: "authorizer not found on context", - Code: platform.EInternal, + Code: influxdb.EInternal, + } + } + if a == nil { + return nil, &influxdb.Error{ + Code: influxdb.EInternal, + Msg: "unexpected invalid authorizer", } } @@ -33,19 +39,19 @@ func GetAuthorizer(ctx context.Context) (platform.Authorizer, error) { // GetToken retrieves a token from the context; errors if no token. func GetToken(ctx context.Context) (string, error) { - a, ok := ctx.Value(authorizerCtxKey).(platform.Authorizer) + a, ok := ctx.Value(authorizerCtxKey).(influxdb.Authorizer) if !ok { - return "", &platform.Error{ + return "", &influxdb.Error{ Msg: "authorizer not found on context", - Code: platform.EInternal, + Code: influxdb.EInternal, } } - auth, ok := a.(*platform.Authorization) + auth, ok := a.(*influxdb.Authorization) if !ok { - return "", &platform.Error{ + return "", &influxdb.Error{ Msg: fmt.Sprintf("authorizer not an authorization but a %T", a), - Code: platform.EInternal, + Code: influxdb.EInternal, } } diff --git a/http/authentication_middleware.go b/http/authentication_middleware.go index efb67b9ef7..fe537ca158 100644 --- a/http/authentication_middleware.go +++ b/http/authentication_middleware.go @@ -121,16 +121,16 @@ func (h *AuthenticationHandler) extractSession(ctx context.Context, r *http.Requ return ctx, err } - s, e := h.SessionService.FindSession(ctx, k) - if e != nil { - return ctx, e + s, err := h.SessionService.FindSession(ctx, k) + if err != nil { + return ctx, err } if !h.SessionRenewDisabled { // if the session is not expired, renew the session - e = h.SessionService.RenewSession(ctx, s, time.Now().Add(platform.RenewSessionTime)) - if e != nil { - return ctx, e + err = h.SessionService.RenewSession(ctx, s, time.Now().Add(platform.RenewSessionTime)) + if err != nil { + return ctx, err } } diff --git a/http/metric/recorder.go b/http/metric/recorder.go index f95a04eb54..4e35191ad5 100644 --- a/http/metric/recorder.go +++ b/http/metric/recorder.go @@ -19,3 +19,9 @@ type Event struct { ResponseBytes int Status int } + +// NopEventRecorder never records events. +type NopEventRecorder struct{} + +// Record never records events. +func (n *NopEventRecorder) Record(ctx context.Context, e Event) {} diff --git a/http/requests.go b/http/requests.go index 278d275683..cfb5cabcd4 100644 --- a/http/requests.go +++ b/http/requests.go @@ -15,6 +15,12 @@ const ( ) // queryOrganization returns the organization for any http request. +// +// It checks the org= and then orgID= parameter of the request. +// +// This will try to find the organization using an ID string or +// the name. It interprets the &org= parameter as either the name +// or the ID. func queryOrganization(ctx context.Context, r *http.Request, svc platform.OrganizationService) (o *platform.Organization, err error) { filter := platform.OrganizationFilter{} diff --git a/http/session_handler.go b/http/session_handler.go index d955a4f56a..ff538dff86 100644 --- a/http/session_handler.go +++ b/http/session_handler.go @@ -125,7 +125,7 @@ type signoutRequest struct { Key string } -func decodeSignoutRequest(ctx context.Context, r *http.Request) (*signoutRequest, *platform.Error) { +func decodeSignoutRequest(ctx context.Context, r *http.Request) (*signoutRequest, error) { key, err := decodeCookieSession(ctx, r) if err != nil { return nil, err @@ -145,12 +145,12 @@ func encodeCookieSession(w http.ResponseWriter, s *platform.Session) { http.SetCookie(w, c) } -func decodeCookieSession(ctx context.Context, r *http.Request) (string, *platform.Error) { +func decodeCookieSession(ctx context.Context, r *http.Request) (string, error) { c, err := r.Cookie(cookieSessionName) if err != nil { return "", &platform.Error{ - Err: err, Code: platform.EInvalid, + Err: err, } } return c.Value, nil diff --git a/http/write_handler.go b/http/write_handler.go index f64ca188b4..7496434201 100644 --- a/http/write_handler.go +++ b/http/write_handler.go @@ -13,7 +13,7 @@ import ( "github.com/julienschmidt/httprouter" "go.uber.org/zap" - platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb" pcontext "github.com/influxdata/influxdb/context" "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/models" @@ -24,13 +24,13 @@ import ( // WriteBackend is all services and associated parameters required to construct // the WriteHandler. type WriteBackend struct { - platform.HTTPErrorHandler + influxdb.HTTPErrorHandler Logger *zap.Logger WriteEventRecorder metric.EventRecorder PointsWriter storage.PointsWriter - BucketService platform.BucketService - OrganizationService platform.OrganizationService + BucketService influxdb.BucketService + OrganizationService influxdb.OrganizationService } // NewWriteBackend returns a new instance of WriteBackend. @@ -49,11 +49,11 @@ func NewWriteBackend(b *APIBackend) *WriteBackend { // WriteHandler receives line protocol and sends to a publish function. type WriteHandler struct { *httprouter.Router - platform.HTTPErrorHandler + influxdb.HTTPErrorHandler Logger *zap.Logger - BucketService platform.BucketService - OrganizationService platform.OrganizationService + BucketService influxdb.BucketService + OrganizationService influxdb.OrganizationService PointsWriter storage.PointsWriter @@ -92,7 +92,7 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { // TODO(desa): I really don't like how we're recording the usage metrics here // Ideally this will be moved when we solve https://github.com/influxdata/influxdb/issues/13403 - var orgID platform.ID + var orgID influxdb.ID var requestBytes int sw := newStatusResponseWriter(w) w = sw @@ -111,8 +111,8 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { var err error in, err = gzip.NewReader(r.Body) if err != nil { - h.HandleHTTPError(ctx, &platform.Error{ - Code: platform.EInvalid, + h.HandleHTTPError(ctx, &influxdb.Error{ + Code: influxdb.EInvalid, Op: "http/handleWrite", Msg: errInvalidGzipHeader, Err: err, @@ -136,7 +136,7 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { logger := h.Logger.With(zap.String("org", req.Org), zap.String("bucket", req.Bucket)) - var org *platform.Organization + var org *influxdb.Organization org, err = queryOrganization(ctx, r, h.OrganizationService) if err != nil { logger.Info("Failed to find organization", zap.Error(err)) @@ -146,31 +146,28 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { orgID = org.ID - var bucket *platform.Bucket - if id, err := platform.IDFromString(req.Bucket); err == nil { + var bucket *influxdb.Bucket + if id, err := influxdb.IDFromString(req.Bucket); err == nil { // Decoded ID successfully. Make sure it's a real bucket. - b, err := h.BucketService.FindBucket(ctx, platform.BucketFilter{ + b, err := h.BucketService.FindBucket(ctx, influxdb.BucketFilter{ OrganizationID: &org.ID, ID: id, }) if err == nil { bucket = b - } else if platform.ErrorCode(err) != platform.ENotFound { + } else if influxdb.ErrorCode(err) != influxdb.ENotFound { h.HandleHTTPError(ctx, err, w) return } } if bucket == nil { - b, err := h.BucketService.FindBucket(ctx, platform.BucketFilter{ + b, err := h.BucketService.FindBucket(ctx, influxdb.BucketFilter{ OrganizationID: &org.ID, Name: &req.Bucket, }) if err != nil { - h.HandleHTTPError(ctx, &platform.Error{ - Op: "http/handleWrite", - Err: err, - }, w) + h.HandleHTTPError(ctx, err, w) return } @@ -179,18 +176,18 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { // TODO(jade): remove this after system buckets issue is resolved if bucket.IsSystem() { - h.HandleHTTPError(ctx, &platform.Error{ - Code: platform.EForbidden, + h.HandleHTTPError(ctx, &influxdb.Error{ + Code: influxdb.EForbidden, Op: "http/handleWrite", Msg: fmt.Sprintf("cannot write to internal bucket %s", bucket.Name), }, w) return } - p, err := platform.NewPermissionAtID(bucket.ID, platform.WriteAction, platform.BucketsResourceType, org.ID) + p, err := influxdb.NewPermissionAtID(bucket.ID, influxdb.WriteAction, influxdb.BucketsResourceType, org.ID) if err != nil { - h.HandleHTTPError(ctx, &platform.Error{ - Code: platform.EInternal, + h.HandleHTTPError(ctx, &influxdb.Error{ + Code: influxdb.EInternal, Op: "http/handleWrite", Msg: fmt.Sprintf("unable to create permission for bucket: %v", err), Err: err, @@ -199,8 +196,8 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { } if !a.Allowed(*p) { - h.HandleHTTPError(ctx, &platform.Error{ - Code: platform.EForbidden, + h.HandleHTTPError(ctx, &influxdb.Error{ + Code: influxdb.EForbidden, Op: "http/handleWrite", Msg: "insufficient permissions for write", }, w) @@ -213,36 +210,43 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { data, err := ioutil.ReadAll(in) if err != nil { logger.Error("Error reading body", zap.Error(err)) - h.HandleHTTPError(ctx, &platform.Error{ - Code: platform.EInternal, + h.HandleHTTPError(ctx, &influxdb.Error{ + Code: influxdb.EInternal, Op: "http/handleWrite", Msg: fmt.Sprintf("unable to read data: %v", err), Err: err, }, w) return } + requestBytes = len(data) + if requestBytes == 0 { + h.HandleHTTPError(ctx, &influxdb.Error{ + Code: influxdb.EInvalid, + Op: "http/handleWrite", + Msg: "writing requires points", + }, w) + return + } encoded := tsdb.EncodeName(org.ID, bucket.ID) mm := models.EscapeMeasurement(encoded[:]) points, err := models.ParsePointsWithPrecision(data, mm, time.Now(), req.Precision) if err != nil { logger.Error("Error parsing points", zap.Error(err)) - h.HandleHTTPError(ctx, &platform.Error{ - Code: platform.EInvalid, - Op: "http/handleWrite", - Msg: fmt.Sprintf("unable to parse points: %v", err), - Err: err, + h.HandleHTTPError(ctx, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: err.Error(), }, w) return } if err := h.PointsWriter.WritePoints(ctx, points); err != nil { logger.Error("Error writing points", zap.Error(err)) - h.HandleHTTPError(ctx, &platform.Error{ - Code: platform.EInternal, + h.HandleHTTPError(ctx, &influxdb.Error{ + Code: influxdb.EInternal, Op: "http/handleWrite", - Msg: fmt.Sprintf("unable to write points to database: %v", err), + Msg: "unexpected error writing points to database", Err: err, }, w) return @@ -259,8 +263,8 @@ func decodeWriteRequest(ctx context.Context, r *http.Request) (*postWriteRequest } if !models.ValidPrecision(p) { - return nil, &platform.Error{ - Code: platform.EInvalid, + return nil, &influxdb.Error{ + Code: influxdb.EInvalid, Op: "http/decodeWriteRequest", Msg: errInvalidPrecision, } @@ -287,17 +291,17 @@ type WriteService struct { InsecureSkipVerify bool } -var _ platform.WriteService = (*WriteService)(nil) +var _ influxdb.WriteService = (*WriteService)(nil) -func (s *WriteService) Write(ctx context.Context, orgID, bucketID platform.ID, r io.Reader) error { +func (s *WriteService) Write(ctx context.Context, orgID, bucketID influxdb.ID, r io.Reader) error { precision := s.Precision if precision == "" { precision = "ns" } if !models.ValidPrecision(precision) { - return &platform.Error{ - Code: platform.EInvalid, + return &influxdb.Error{ + Code: influxdb.EInvalid, Op: "http/Write", Msg: errInvalidPrecision, } diff --git a/http/write_handler_test.go b/http/write_handler_test.go index 503cb44b4e..08ef5aa7df 100644 --- a/http/write_handler_test.go +++ b/http/write_handler_test.go @@ -3,6 +3,7 @@ package http import ( "compress/gzip" "context" + "fmt" "io" "io/ioutil" "net/http" @@ -10,13 +11,18 @@ import ( "strings" "testing" - platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/http/metric" + httpmock "github.com/influxdata/influxdb/http/mock" + "github.com/influxdata/influxdb/mock" + influxtesting "github.com/influxdata/influxdb/testing" + "go.uber.org/zap/zaptest" ) func TestWriteService_Write(t *testing.T) { type args struct { - org platform.ID - bucket platform.ID + org influxdb.ID + bucket influxdb.ID r io.Reader } tests := []struct { @@ -38,11 +44,11 @@ func TestWriteService_Write(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var org, bucket *platform.ID + var org, bucket *influxdb.ID var lp []byte ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - org, _ = platform.IDFromString(r.URL.Query().Get("org")) - bucket, _ = platform.IDFromString(r.URL.Query().Get("bucket")) + org, _ = influxdb.IDFromString(r.URL.Query().Get("org")) + bucket, _ = influxdb.IDFromString(r.URL.Query().Get("bucket")) defer r.Body.Close() in, _ := gzip.NewReader(r.Body) defer in.Close() @@ -69,3 +75,284 @@ func TestWriteService_Write(t *testing.T) { }) } } + +func TestWriteHandler_handleWrite(t *testing.T) { + // state is the internal state of org and bucket services + type state struct { + org *influxdb.Organization // org to return in org service + orgErr error // err to return in org servce + bucket *influxdb.Bucket // bucket to return in bucket service + bucketErr error // err to return in bucket service + writeErr error // err to return from the points writer + } + + // want is the expected output of the HTTP endpoint + type wants struct { + body string + code int + } + + // request is sent to the HTTP endpoint + type request struct { + auth influxdb.Authorizer + org string + bucket string + body string + } + + tests := []struct { + name string + request request + state state + wants wants + }{ + { + name: "simple body is accepted", + request: request{ + org: "043e0780ee2b1000", + bucket: "04504b356e23b000", + body: "m1,t1=v1 f1=1", + auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"), + }, + state: state{ + org: testOrg("043e0780ee2b1000"), + bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"), + }, + wants: wants{ + code: 204, + }, + }, + { + name: "points writer error is an internal error", + request: request{ + org: "043e0780ee2b1000", + bucket: "04504b356e23b000", + body: "m1,t1=v1 f1=1", + auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"), + }, + state: state{ + org: testOrg("043e0780ee2b1000"), + bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"), + writeErr: fmt.Errorf("error"), + }, + wants: wants{ + code: 500, + body: `{"code":"internal error","message":"unexpected error writing points to database","op":"http/handleWrite","error":"error"}`, + }, + }, + { + name: "empty request body returns 400 error", + request: request{ + org: "043e0780ee2b1000", + bucket: "04504b356e23b000", + auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"), + }, + state: state{ + org: testOrg("043e0780ee2b1000"), + bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"), + }, + wants: wants{ + code: 400, + body: `{"code":"invalid","message":"writing requires points","op":"http/handleWrite"}`, + }, + }, + { + name: "org error returns 404 error", + request: request{ + org: "043e0780ee2b1000", + bucket: "04504b356e23b000", + body: "m1,t1=v1 f1=1", + auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"), + }, + state: state{ + orgErr: &influxdb.Error{Code: influxdb.ENotFound, Msg: "not found"}, + }, + wants: wants{ + code: 404, + body: `{"code":"not found","message":"not found"}`, + }, + }, + { + name: "bucket error returns 404 error", + request: request{ + org: "043e0780ee2b1000", + bucket: "04504b356e23b000", + body: "m1,t1=v1 f1=1", + auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"), + }, + state: state{ + org: testOrg("043e0780ee2b1000"), + bucketErr: &influxdb.Error{Code: influxdb.ENotFound, Msg: "not found"}, + }, + wants: wants{ + code: 404, + body: `{"code":"not found","message":"not found"}`, + }, + }, + { + name: "500 when bucket service returns internal error", + request: request{ + org: "043e0780ee2b1000", + bucket: "04504b356e23b000", + auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"), + }, + state: state{ + org: testOrg("043e0780ee2b1000"), + bucketErr: &influxdb.Error{Code: influxdb.EInternal, Msg: "internal error"}, + }, + wants: wants{ + code: 500, + body: `{"code":"internal error","message":"internal error"}`, + }, + }, + { + name: "invalid line protocol returns 400", + request: request{ + org: "043e0780ee2b1000", + bucket: "04504b356e23b000", + auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"), + body: "invalid", + }, + state: state{ + org: testOrg("043e0780ee2b1000"), + bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"), + }, + wants: wants{ + code: 400, + body: `{"code":"invalid","message":"unable to parse 'invalid': missing fields"}`, + }, + }, + { + name: "forbidden to write to system buckets", + request: request{ + org: "043e0780ee2b1000", + bucket: "000000000000000a", + auth: bucketWritePermission("043e0780ee2b1000", "000000000000000a"), + body: "invalid", + }, + state: state{ + org: testOrg("043e0780ee2b1000"), + bucket: testBucket("043e0780ee2b1000", "000000000000000a"), + }, + wants: wants{ + code: 403, + body: `{"code":"forbidden","message":"cannot write to internal bucket ","op":"http/handleWrite"}`, + }, + }, + { + name: "forbidden to write with insufficient permission", + request: request{ + org: "043e0780ee2b1000", + bucket: "04504b356e23b000", + body: "m1,t1=v1 f1=1", + auth: bucketWritePermission("043e0780ee2b1000", "000000000000000a"), + }, + state: state{ + org: testOrg("043e0780ee2b1000"), + bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"), + }, + wants: wants{ + code: 403, + body: `{"code":"forbidden","message":"insufficient permissions for write","op":"http/handleWrite"}`, + }, + }, + { + // authorization extraction happens in a different middleware. + name: "no authorizer is an internal error", + request: request{ + org: "043e0780ee2b1000", + bucket: "04504b356e23b000", + }, + state: state{ + org: testOrg("043e0780ee2b1000"), + bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"), + }, + wants: wants{ + code: 500, + body: `{"code":"internal error","message":"authorizer not found on context"}`, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + orgs := mock.NewOrganizationService() + orgs.FindOrganizationF = func(ctx context.Context, filter influxdb.OrganizationFilter) (*influxdb.Organization, error) { + return tt.state.org, tt.state.orgErr + } + buckets := mock.NewBucketService() + buckets.FindBucketFn = func(context.Context, influxdb.BucketFilter) (*influxdb.Bucket, error) { + return tt.state.bucket, tt.state.bucketErr + } + + b := &APIBackend{ + HTTPErrorHandler: DefaultErrorHandler, + Logger: zaptest.NewLogger(t), + OrganizationService: orgs, + BucketService: buckets, + PointsWriter: &mock.PointsWriter{Err: tt.state.writeErr}, + WriteEventRecorder: &metric.NopEventRecorder{}, + } + writeHandler := NewWriteHandler(NewWriteBackend(b)) + handler := httpmock.NewAuthMiddlewareHandler(writeHandler, tt.request.auth) + + r := httptest.NewRequest( + "POST", + "http://localhost:9999/api/v2/write", + strings.NewReader(tt.request.body), + ) + + params := r.URL.Query() + params.Set("org", tt.request.org) + params.Set("bucket", tt.request.bucket) + r.URL.RawQuery = params.Encode() + + w := httptest.NewRecorder() + handler.ServeHTTP(w, r) + if got, want := w.Code, tt.wants.code; got != want { + t.Errorf("unexpected status code: got %d want %d", got, want) + } + + if got, want := w.Body.String(), tt.wants.body; got != want { + t.Errorf("unexpected body: got %s want %s", got, want) + } + }) + } +} + +var DefaultErrorHandler = ErrorHandler(0) + +func bucketWritePermission(org, bucket string) *influxdb.Authorization { + oid := influxtesting.MustIDBase16(org) + bid := influxtesting.MustIDBase16(bucket) + return &influxdb.Authorization{ + OrgID: oid, + Status: influxdb.Active, + Permissions: []influxdb.Permission{ + { + Action: influxdb.WriteAction, + Resource: influxdb.Resource{ + Type: influxdb.BucketsResourceType, + OrgID: &oid, + ID: &bid, + }, + }, + }, + } +} + +func testOrg(org string) *influxdb.Organization { + oid := influxtesting.MustIDBase16(org) + return &influxdb.Organization{ + ID: oid, + } +} + +func testBucket(org, bucket string) *influxdb.Bucket { + oid := influxtesting.MustIDBase16(org) + bid := influxtesting.MustIDBase16(bucket) + + return &influxdb.Bucket{ + ID: bid, + OrgID: oid, + } +}