test(http): add unit tests for /api/v2/write (#15128)

test(http): add unit tests for /api/v2/write
pull/15138/head
Chris Goller 2019-09-12 13:57:06 -05:00 committed by GitHub
commit 5b4c0db4a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 379 additions and 70 deletions

View File

@ -4,7 +4,7 @@ import (
"context"
"fmt"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb"
)
type contextKey string
@ -14,17 +14,23 @@ const (
)
// SetAuthorizer sets an authorizer on context.
func SetAuthorizer(ctx context.Context, a platform.Authorizer) context.Context {
func SetAuthorizer(ctx context.Context, a influxdb.Authorizer) context.Context {
return context.WithValue(ctx, authorizerCtxKey, a)
}
// GetAuthorizer retrieves an authorizer from context.
func GetAuthorizer(ctx context.Context) (platform.Authorizer, error) {
a, ok := ctx.Value(authorizerCtxKey).(platform.Authorizer)
func GetAuthorizer(ctx context.Context) (influxdb.Authorizer, error) {
a, ok := ctx.Value(authorizerCtxKey).(influxdb.Authorizer)
if !ok {
return nil, &platform.Error{
return nil, &influxdb.Error{
Msg: "authorizer not found on context",
Code: platform.EInternal,
Code: influxdb.EInternal,
}
}
if a == nil {
return nil, &influxdb.Error{
Code: influxdb.EInternal,
Msg: "unexpected invalid authorizer",
}
}
@ -33,19 +39,19 @@ func GetAuthorizer(ctx context.Context) (platform.Authorizer, error) {
// GetToken retrieves a token from the context; errors if no token.
func GetToken(ctx context.Context) (string, error) {
a, ok := ctx.Value(authorizerCtxKey).(platform.Authorizer)
a, ok := ctx.Value(authorizerCtxKey).(influxdb.Authorizer)
if !ok {
return "", &platform.Error{
return "", &influxdb.Error{
Msg: "authorizer not found on context",
Code: platform.EInternal,
Code: influxdb.EInternal,
}
}
auth, ok := a.(*platform.Authorization)
auth, ok := a.(*influxdb.Authorization)
if !ok {
return "", &platform.Error{
return "", &influxdb.Error{
Msg: fmt.Sprintf("authorizer not an authorization but a %T", a),
Code: platform.EInternal,
Code: influxdb.EInternal,
}
}

View File

@ -121,16 +121,16 @@ func (h *AuthenticationHandler) extractSession(ctx context.Context, r *http.Requ
return ctx, err
}
s, e := h.SessionService.FindSession(ctx, k)
if e != nil {
return ctx, e
s, err := h.SessionService.FindSession(ctx, k)
if err != nil {
return ctx, err
}
if !h.SessionRenewDisabled {
// if the session is not expired, renew the session
e = h.SessionService.RenewSession(ctx, s, time.Now().Add(platform.RenewSessionTime))
if e != nil {
return ctx, e
err = h.SessionService.RenewSession(ctx, s, time.Now().Add(platform.RenewSessionTime))
if err != nil {
return ctx, err
}
}

View File

@ -19,3 +19,9 @@ type Event struct {
ResponseBytes int
Status int
}
// NopEventRecorder never records events.
type NopEventRecorder struct{}
// Record never records events.
func (n *NopEventRecorder) Record(ctx context.Context, e Event) {}

View File

@ -15,6 +15,12 @@ const (
)
// queryOrganization returns the organization for any http request.
//
// It checks the org= and then orgID= parameter of the request.
//
// This will try to find the organization using an ID string or
// the name. It interprets the &org= parameter as either the name
// or the ID.
func queryOrganization(ctx context.Context, r *http.Request, svc platform.OrganizationService) (o *platform.Organization, err error) {
filter := platform.OrganizationFilter{}

View File

@ -125,7 +125,7 @@ type signoutRequest struct {
Key string
}
func decodeSignoutRequest(ctx context.Context, r *http.Request) (*signoutRequest, *platform.Error) {
func decodeSignoutRequest(ctx context.Context, r *http.Request) (*signoutRequest, error) {
key, err := decodeCookieSession(ctx, r)
if err != nil {
return nil, err
@ -145,12 +145,12 @@ func encodeCookieSession(w http.ResponseWriter, s *platform.Session) {
http.SetCookie(w, c)
}
func decodeCookieSession(ctx context.Context, r *http.Request) (string, *platform.Error) {
func decodeCookieSession(ctx context.Context, r *http.Request) (string, error) {
c, err := r.Cookie(cookieSessionName)
if err != nil {
return "", &platform.Error{
Err: err,
Code: platform.EInvalid,
Err: err,
}
}
return c.Value, nil

View File

@ -13,7 +13,7 @@ import (
"github.com/julienschmidt/httprouter"
"go.uber.org/zap"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb"
pcontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/models"
@ -24,13 +24,13 @@ import (
// WriteBackend is all services and associated parameters required to construct
// the WriteHandler.
type WriteBackend struct {
platform.HTTPErrorHandler
influxdb.HTTPErrorHandler
Logger *zap.Logger
WriteEventRecorder metric.EventRecorder
PointsWriter storage.PointsWriter
BucketService platform.BucketService
OrganizationService platform.OrganizationService
BucketService influxdb.BucketService
OrganizationService influxdb.OrganizationService
}
// NewWriteBackend returns a new instance of WriteBackend.
@ -49,11 +49,11 @@ func NewWriteBackend(b *APIBackend) *WriteBackend {
// WriteHandler receives line protocol and sends to a publish function.
type WriteHandler struct {
*httprouter.Router
platform.HTTPErrorHandler
influxdb.HTTPErrorHandler
Logger *zap.Logger
BucketService platform.BucketService
OrganizationService platform.OrganizationService
BucketService influxdb.BucketService
OrganizationService influxdb.OrganizationService
PointsWriter storage.PointsWriter
@ -92,7 +92,7 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
// TODO(desa): I really don't like how we're recording the usage metrics here
// Ideally this will be moved when we solve https://github.com/influxdata/influxdb/issues/13403
var orgID platform.ID
var orgID influxdb.ID
var requestBytes int
sw := newStatusResponseWriter(w)
w = sw
@ -111,8 +111,8 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
var err error
in, err = gzip.NewReader(r.Body)
if err != nil {
h.HandleHTTPError(ctx, &platform.Error{
Code: platform.EInvalid,
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInvalid,
Op: "http/handleWrite",
Msg: errInvalidGzipHeader,
Err: err,
@ -136,7 +136,7 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
logger := h.Logger.With(zap.String("org", req.Org), zap.String("bucket", req.Bucket))
var org *platform.Organization
var org *influxdb.Organization
org, err = queryOrganization(ctx, r, h.OrganizationService)
if err != nil {
logger.Info("Failed to find organization", zap.Error(err))
@ -146,31 +146,28 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
orgID = org.ID
var bucket *platform.Bucket
if id, err := platform.IDFromString(req.Bucket); err == nil {
var bucket *influxdb.Bucket
if id, err := influxdb.IDFromString(req.Bucket); err == nil {
// Decoded ID successfully. Make sure it's a real bucket.
b, err := h.BucketService.FindBucket(ctx, platform.BucketFilter{
b, err := h.BucketService.FindBucket(ctx, influxdb.BucketFilter{
OrganizationID: &org.ID,
ID: id,
})
if err == nil {
bucket = b
} else if platform.ErrorCode(err) != platform.ENotFound {
} else if influxdb.ErrorCode(err) != influxdb.ENotFound {
h.HandleHTTPError(ctx, err, w)
return
}
}
if bucket == nil {
b, err := h.BucketService.FindBucket(ctx, platform.BucketFilter{
b, err := h.BucketService.FindBucket(ctx, influxdb.BucketFilter{
OrganizationID: &org.ID,
Name: &req.Bucket,
})
if err != nil {
h.HandleHTTPError(ctx, &platform.Error{
Op: "http/handleWrite",
Err: err,
}, w)
h.HandleHTTPError(ctx, err, w)
return
}
@ -179,18 +176,18 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
// TODO(jade): remove this after system buckets issue is resolved
if bucket.IsSystem() {
h.HandleHTTPError(ctx, &platform.Error{
Code: platform.EForbidden,
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EForbidden,
Op: "http/handleWrite",
Msg: fmt.Sprintf("cannot write to internal bucket %s", bucket.Name),
}, w)
return
}
p, err := platform.NewPermissionAtID(bucket.ID, platform.WriteAction, platform.BucketsResourceType, org.ID)
p, err := influxdb.NewPermissionAtID(bucket.ID, influxdb.WriteAction, influxdb.BucketsResourceType, org.ID)
if err != nil {
h.HandleHTTPError(ctx, &platform.Error{
Code: platform.EInternal,
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInternal,
Op: "http/handleWrite",
Msg: fmt.Sprintf("unable to create permission for bucket: %v", err),
Err: err,
@ -199,8 +196,8 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
}
if !a.Allowed(*p) {
h.HandleHTTPError(ctx, &platform.Error{
Code: platform.EForbidden,
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EForbidden,
Op: "http/handleWrite",
Msg: "insufficient permissions for write",
}, w)
@ -213,36 +210,43 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) {
data, err := ioutil.ReadAll(in)
if err != nil {
logger.Error("Error reading body", zap.Error(err))
h.HandleHTTPError(ctx, &platform.Error{
Code: platform.EInternal,
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInternal,
Op: "http/handleWrite",
Msg: fmt.Sprintf("unable to read data: %v", err),
Err: err,
}, w)
return
}
requestBytes = len(data)
if requestBytes == 0 {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInvalid,
Op: "http/handleWrite",
Msg: "writing requires points",
}, w)
return
}
encoded := tsdb.EncodeName(org.ID, bucket.ID)
mm := models.EscapeMeasurement(encoded[:])
points, err := models.ParsePointsWithPrecision(data, mm, time.Now(), req.Precision)
if err != nil {
logger.Error("Error parsing points", zap.Error(err))
h.HandleHTTPError(ctx, &platform.Error{
Code: platform.EInvalid,
Op: "http/handleWrite",
Msg: fmt.Sprintf("unable to parse points: %v", err),
Err: err,
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: err.Error(),
}, w)
return
}
if err := h.PointsWriter.WritePoints(ctx, points); err != nil {
logger.Error("Error writing points", zap.Error(err))
h.HandleHTTPError(ctx, &platform.Error{
Code: platform.EInternal,
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInternal,
Op: "http/handleWrite",
Msg: fmt.Sprintf("unable to write points to database: %v", err),
Msg: "unexpected error writing points to database",
Err: err,
}, w)
return
@ -259,8 +263,8 @@ func decodeWriteRequest(ctx context.Context, r *http.Request) (*postWriteRequest
}
if !models.ValidPrecision(p) {
return nil, &platform.Error{
Code: platform.EInvalid,
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Op: "http/decodeWriteRequest",
Msg: errInvalidPrecision,
}
@ -287,17 +291,17 @@ type WriteService struct {
InsecureSkipVerify bool
}
var _ platform.WriteService = (*WriteService)(nil)
var _ influxdb.WriteService = (*WriteService)(nil)
func (s *WriteService) Write(ctx context.Context, orgID, bucketID platform.ID, r io.Reader) error {
func (s *WriteService) Write(ctx context.Context, orgID, bucketID influxdb.ID, r io.Reader) error {
precision := s.Precision
if precision == "" {
precision = "ns"
}
if !models.ValidPrecision(precision) {
return &platform.Error{
Code: platform.EInvalid,
return &influxdb.Error{
Code: influxdb.EInvalid,
Op: "http/Write",
Msg: errInvalidPrecision,
}

View File

@ -3,6 +3,7 @@ package http
import (
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
@ -10,13 +11,18 @@ import (
"strings"
"testing"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/http/metric"
httpmock "github.com/influxdata/influxdb/http/mock"
"github.com/influxdata/influxdb/mock"
influxtesting "github.com/influxdata/influxdb/testing"
"go.uber.org/zap/zaptest"
)
func TestWriteService_Write(t *testing.T) {
type args struct {
org platform.ID
bucket platform.ID
org influxdb.ID
bucket influxdb.ID
r io.Reader
}
tests := []struct {
@ -38,11 +44,11 @@ func TestWriteService_Write(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var org, bucket *platform.ID
var org, bucket *influxdb.ID
var lp []byte
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
org, _ = platform.IDFromString(r.URL.Query().Get("org"))
bucket, _ = platform.IDFromString(r.URL.Query().Get("bucket"))
org, _ = influxdb.IDFromString(r.URL.Query().Get("org"))
bucket, _ = influxdb.IDFromString(r.URL.Query().Get("bucket"))
defer r.Body.Close()
in, _ := gzip.NewReader(r.Body)
defer in.Close()
@ -69,3 +75,284 @@ func TestWriteService_Write(t *testing.T) {
})
}
}
func TestWriteHandler_handleWrite(t *testing.T) {
// state is the internal state of org and bucket services
type state struct {
org *influxdb.Organization // org to return in org service
orgErr error // err to return in org servce
bucket *influxdb.Bucket // bucket to return in bucket service
bucketErr error // err to return in bucket service
writeErr error // err to return from the points writer
}
// want is the expected output of the HTTP endpoint
type wants struct {
body string
code int
}
// request is sent to the HTTP endpoint
type request struct {
auth influxdb.Authorizer
org string
bucket string
body string
}
tests := []struct {
name string
request request
state state
wants wants
}{
{
name: "simple body is accepted",
request: request{
org: "043e0780ee2b1000",
bucket: "04504b356e23b000",
body: "m1,t1=v1 f1=1",
auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"),
},
state: state{
org: testOrg("043e0780ee2b1000"),
bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"),
},
wants: wants{
code: 204,
},
},
{
name: "points writer error is an internal error",
request: request{
org: "043e0780ee2b1000",
bucket: "04504b356e23b000",
body: "m1,t1=v1 f1=1",
auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"),
},
state: state{
org: testOrg("043e0780ee2b1000"),
bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"),
writeErr: fmt.Errorf("error"),
},
wants: wants{
code: 500,
body: `{"code":"internal error","message":"unexpected error writing points to database","op":"http/handleWrite","error":"error"}`,
},
},
{
name: "empty request body returns 400 error",
request: request{
org: "043e0780ee2b1000",
bucket: "04504b356e23b000",
auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"),
},
state: state{
org: testOrg("043e0780ee2b1000"),
bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"),
},
wants: wants{
code: 400,
body: `{"code":"invalid","message":"writing requires points","op":"http/handleWrite"}`,
},
},
{
name: "org error returns 404 error",
request: request{
org: "043e0780ee2b1000",
bucket: "04504b356e23b000",
body: "m1,t1=v1 f1=1",
auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"),
},
state: state{
orgErr: &influxdb.Error{Code: influxdb.ENotFound, Msg: "not found"},
},
wants: wants{
code: 404,
body: `{"code":"not found","message":"not found"}`,
},
},
{
name: "bucket error returns 404 error",
request: request{
org: "043e0780ee2b1000",
bucket: "04504b356e23b000",
body: "m1,t1=v1 f1=1",
auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"),
},
state: state{
org: testOrg("043e0780ee2b1000"),
bucketErr: &influxdb.Error{Code: influxdb.ENotFound, Msg: "not found"},
},
wants: wants{
code: 404,
body: `{"code":"not found","message":"not found"}`,
},
},
{
name: "500 when bucket service returns internal error",
request: request{
org: "043e0780ee2b1000",
bucket: "04504b356e23b000",
auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"),
},
state: state{
org: testOrg("043e0780ee2b1000"),
bucketErr: &influxdb.Error{Code: influxdb.EInternal, Msg: "internal error"},
},
wants: wants{
code: 500,
body: `{"code":"internal error","message":"internal error"}`,
},
},
{
name: "invalid line protocol returns 400",
request: request{
org: "043e0780ee2b1000",
bucket: "04504b356e23b000",
auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"),
body: "invalid",
},
state: state{
org: testOrg("043e0780ee2b1000"),
bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"),
},
wants: wants{
code: 400,
body: `{"code":"invalid","message":"unable to parse 'invalid': missing fields"}`,
},
},
{
name: "forbidden to write to system buckets",
request: request{
org: "043e0780ee2b1000",
bucket: "000000000000000a",
auth: bucketWritePermission("043e0780ee2b1000", "000000000000000a"),
body: "invalid",
},
state: state{
org: testOrg("043e0780ee2b1000"),
bucket: testBucket("043e0780ee2b1000", "000000000000000a"),
},
wants: wants{
code: 403,
body: `{"code":"forbidden","message":"cannot write to internal bucket ","op":"http/handleWrite"}`,
},
},
{
name: "forbidden to write with insufficient permission",
request: request{
org: "043e0780ee2b1000",
bucket: "04504b356e23b000",
body: "m1,t1=v1 f1=1",
auth: bucketWritePermission("043e0780ee2b1000", "000000000000000a"),
},
state: state{
org: testOrg("043e0780ee2b1000"),
bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"),
},
wants: wants{
code: 403,
body: `{"code":"forbidden","message":"insufficient permissions for write","op":"http/handleWrite"}`,
},
},
{
// authorization extraction happens in a different middleware.
name: "no authorizer is an internal error",
request: request{
org: "043e0780ee2b1000",
bucket: "04504b356e23b000",
},
state: state{
org: testOrg("043e0780ee2b1000"),
bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"),
},
wants: wants{
code: 500,
body: `{"code":"internal error","message":"authorizer not found on context"}`,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
orgs := mock.NewOrganizationService()
orgs.FindOrganizationF = func(ctx context.Context, filter influxdb.OrganizationFilter) (*influxdb.Organization, error) {
return tt.state.org, tt.state.orgErr
}
buckets := mock.NewBucketService()
buckets.FindBucketFn = func(context.Context, influxdb.BucketFilter) (*influxdb.Bucket, error) {
return tt.state.bucket, tt.state.bucketErr
}
b := &APIBackend{
HTTPErrorHandler: DefaultErrorHandler,
Logger: zaptest.NewLogger(t),
OrganizationService: orgs,
BucketService: buckets,
PointsWriter: &mock.PointsWriter{Err: tt.state.writeErr},
WriteEventRecorder: &metric.NopEventRecorder{},
}
writeHandler := NewWriteHandler(NewWriteBackend(b))
handler := httpmock.NewAuthMiddlewareHandler(writeHandler, tt.request.auth)
r := httptest.NewRequest(
"POST",
"http://localhost:9999/api/v2/write",
strings.NewReader(tt.request.body),
)
params := r.URL.Query()
params.Set("org", tt.request.org)
params.Set("bucket", tt.request.bucket)
r.URL.RawQuery = params.Encode()
w := httptest.NewRecorder()
handler.ServeHTTP(w, r)
if got, want := w.Code, tt.wants.code; got != want {
t.Errorf("unexpected status code: got %d want %d", got, want)
}
if got, want := w.Body.String(), tt.wants.body; got != want {
t.Errorf("unexpected body: got %s want %s", got, want)
}
})
}
}
var DefaultErrorHandler = ErrorHandler(0)
func bucketWritePermission(org, bucket string) *influxdb.Authorization {
oid := influxtesting.MustIDBase16(org)
bid := influxtesting.MustIDBase16(bucket)
return &influxdb.Authorization{
OrgID: oid,
Status: influxdb.Active,
Permissions: []influxdb.Permission{
{
Action: influxdb.WriteAction,
Resource: influxdb.Resource{
Type: influxdb.BucketsResourceType,
OrgID: &oid,
ID: &bid,
},
},
},
}
}
func testOrg(org string) *influxdb.Organization {
oid := influxtesting.MustIDBase16(org)
return &influxdb.Organization{
ID: oid,
}
}
func testBucket(org, bucket string) *influxdb.Bucket {
oid := influxtesting.MustIDBase16(org)
bid := influxtesting.MustIDBase16(bucket)
return &influxdb.Bucket{
ID: bid,
OrgID: oid,
}
}