feat(pkger): add create stack endpoint for http server

pull/17447/head
Johnny Steenbergen 2020-03-23 16:51:43 -07:00 committed by Johnny Steenbergen
parent 7a24e450c1
commit 8c70dc9b23
9 changed files with 367 additions and 15 deletions

View File

@ -867,12 +867,14 @@ func (m *Launcher) run(ctx context.Context) (err error) {
pkgerLogger := m.log.With(zap.String("service", "pkger"))
pkgSVC = pkger.NewService(
pkger.WithLogger(pkgerLogger),
pkger.WithStore(pkger.NewStoreKV(m.kvStore)),
pkger.WithBucketSVC(authorizer.NewBucketService(b.BucketService, b.UserResourceMappingService)),
pkger.WithCheckSVC(authorizer.NewCheckService(b.CheckService, authedURMSVC, authedOrgSVC)),
pkger.WithDashboardSVC(authorizer.NewDashboardService(b.DashboardService)),
pkger.WithLabelSVC(authorizer.NewLabelServiceWithOrg(b.LabelService, b.OrgLookupService)),
pkger.WithNotificationEndpointSVC(authorizer.NewNotificationEndpointService(b.NotificationEndpointService, authedURMSVC, authedOrgSVC)),
pkger.WithNotificationRuleSVC(authorizer.NewNotificationRuleStore(b.NotificationRuleStore, authedURMSVC, authedOrgSVC)),
pkger.WithOrganizationService(authorizer.NewOrgService(b.OrganizationService)),
pkger.WithSecretSVC(authorizer.NewSecretService(b.SecretService)),
pkger.WithTaskSVC(authorizer.NewTaskService(pkgerLogger, b.TaskService)),
pkger.WithTelegrafSVC(authorizer.NewTelegrafConfigService(b.TelegrafService, b.UserResourceMappingService)),

View File

@ -67,7 +67,7 @@ func NewTestLauncher() *TestLauncher {
return l
}
// RunLauncherOrFail initializes and starts the server.
// RunTestLauncherOrFail initializes and starts the server.
func RunTestLauncherOrFail(tb testing.TB, ctx context.Context, args ...string) *TestLauncher {
tb.Helper()
l := NewTestLauncher()

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net/url"
"testing"
"time"
@ -23,6 +24,28 @@ func TestLauncher_Pkger(t *testing.T) {
svc := l.PkgerService(t)
t.Run("creating a stack", func(t *testing.T) {
expectedURLs := []url.URL{newURL(t, "http://example.com")}
fmt.Println("org init id: ", l.Org.ID)
newStack, err := svc.InitStack(timedCtx(5*time.Second), l.User.ID, pkger.Stack{
OrgID: l.Org.ID,
Name: "first stack",
Desc: "desc",
URLs: expectedURLs,
})
require.NoError(t, err)
assert.NotZero(t, newStack.ID)
assert.Equal(t, l.Org.ID, newStack.OrgID)
assert.Equal(t, "first stack", newStack.Name)
assert.Equal(t, "desc", newStack.Desc)
assert.Equal(t, expectedURLs, newStack.URLs)
assert.NotNil(t, newStack.Resources)
assert.NotZero(t, newStack.CRUDLog)
})
t.Run("errors incurred during application of package rolls back to state before package", func(t *testing.T) {
svc := pkger.NewService(
pkger.WithBucketSVC(l.BucketService(t)),
@ -1267,3 +1290,11 @@ func (f *fakeLabelSVC) CreateLabelMapping(ctx context.Context, m *influxdb.Label
}
return f.LabelService.CreateLabelMapping(ctx, m)
}
func newURL(t *testing.T, rawurl string) url.URL {
t.Helper()
u, err := url.Parse(rawurl)
require.NoError(t, err)
return *u
}

View File

@ -4359,6 +4359,64 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
/packages/stacks:
post:
operationId: CreatePkg
tags:
- InfluxPackages
summary: Create a new Influx package
requestBody:
description: Influx package to create.
required: true
content:
application/json:
schema:
type: object
properties:
orgID:
type: string
name:
type: string
description:
type: string
urls:
type: array
items:
type: string
responses:
'201':
description: Influx stack created
content:
application/json:
schema:
type: object
properties:
id:
type: string
orgID:
type: string
name:
type: string
description:
type: string
urls:
type: array
items:
type: string
createdAt:
type: string
format: date-time
readOnly: true
updatedAt:
type: string
format: date-time
readOnly: true
default:
description: Unexpected error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/tasks:
get:
operationId: GetTasks

View File

@ -2,7 +2,9 @@ package pkger
import (
"context"
"fmt"
"net/http"
"net/url"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/pkg/httpc"
@ -16,7 +18,54 @@ type HTTPRemoteService struct {
var _ SVC = (*HTTPRemoteService)(nil)
func (s *HTTPRemoteService) InitStack(ctx context.Context, userID influxdb.ID, stack Stack) (Stack, error) {
panic("not implemented yet")
reqBody := ReqCreateStack{
OrgID: stack.OrgID.String(),
Name: stack.Name,
Description: stack.Desc,
}
for _, u := range stack.URLs {
reqBody.URLs = append(reqBody.URLs, u.String())
}
var respBody RespCreateStack
err := s.Client.
PostJSON(reqBody, RoutePrefix, "/stacks").
DecodeJSON(&respBody).
Do(ctx)
if err != nil {
return Stack{}, err
}
newStack := Stack{
Name: respBody.Name,
Desc: respBody.Description,
Resources: make([]StackResource, 0),
CRUDLog: respBody.CRUDLog,
}
id, err := influxdb.IDFromString(respBody.ID)
if err != nil {
fmt.Println("IN HERE with id: ", respBody.ID)
return Stack{}, err
}
newStack.ID = *id
orgID, err := influxdb.IDFromString(respBody.OrgID)
if err != nil {
fmt.Println("IN HERE with orgID: ", respBody.OrgID)
return Stack{}, err
}
newStack.OrgID = *orgID
for _, rawurl := range respBody.URLs {
u, err := url.Parse(rawurl)
if err != nil {
return Stack{}, err
}
newStack.URLs = append(newStack.URLs, *u)
}
return newStack, nil
}
// CreatePkg will produce a pkg from the parameters provided.

View File

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"path"
"strings"
@ -49,6 +50,7 @@ func NewHTTPServer(log *zap.Logger, svc SVC) *HTTPServer {
Post("/", svr.createPkg)
r.With(middleware.SetHeader("Content-Type", "application/json; charset=utf-8")).
Post("/apply", svr.applyPkg)
r.Post("/stacks", svr.createStack)
}
svr.Router = r
@ -60,6 +62,99 @@ func (s *HTTPServer) Prefix() string {
return RoutePrefix
}
// ReqCreateStack is a request body for a create stack call.
type ReqCreateStack struct {
OrgID string `json:"orgID"`
Name string `json:"name"`
Description string `json:"description"`
URLs []string `json:"urls"`
}
// OK validates the request body is valid.
func (r *ReqCreateStack) OK() error {
// TODO: provide multiple errors back for failing validation
if _, err := influxdb.IDFromString(r.OrgID); err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("provided org id[%q] is invalid", r.OrgID),
}
}
for _, u := range r.URLs {
if _, err := url.Parse(u); err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("provided url[%q] is invalid", u),
}
}
}
return nil
}
func (r *ReqCreateStack) orgID() influxdb.ID {
orgID, _ := influxdb.IDFromString(r.OrgID)
return *orgID
}
func (r *ReqCreateStack) urls() []url.URL {
urls := make([]url.URL, 0, len(r.URLs))
for _, urlStr := range r.URLs {
u, _ := url.Parse(urlStr)
urls = append(urls, *u)
}
return urls
}
// RespCreateStack is the response body for the create stack call.
type RespCreateStack struct {
ID string `json:"id"`
OrgID string `json:"orgID"`
Name string `json:"name"`
Description string `json:"description"`
URLs []string `json:"urls"`
influxdb.CRUDLog
}
func (s *HTTPServer) createStack(w http.ResponseWriter, r *http.Request) {
var reqBody ReqCreateStack
if err := s.api.DecodeJSON(r.Body, &reqBody); err != nil {
s.api.Err(w, err)
return
}
defer r.Body.Close()
auth, err := pctx.GetAuthorizer(r.Context())
if err != nil {
s.api.Err(w, err)
return
}
stack, err := s.svc.InitStack(r.Context(), auth.GetUserID(), Stack{
OrgID: reqBody.orgID(),
Name: reqBody.Name,
Desc: reqBody.Description,
URLs: reqBody.urls(),
})
if err != nil {
s.api.Err(w, err)
return
}
urlStrs := make([]string, 0, len(stack.URLs))
for _, u := range stack.URLs {
urlStrs = append(urlStrs, u.String())
}
s.api.Respond(w, http.StatusCreated, RespCreateStack{
ID: stack.ID.String(),
OrgID: stack.OrgID.String(),
Name: stack.Name,
Description: stack.Desc,
URLs: urlStrs,
CRUDLog: stack.CRUDLog,
})
}
// ReqCreateOrgIDOpt provides options to export resources by organization id.
type ReqCreateOrgIDOpt struct {
OrgID string `json:"orgID"`

View File

@ -13,6 +13,7 @@ import (
"path"
"strings"
"testing"
"time"
"github.com/go-chi/chi"
"github.com/influxdata/influxdb"
@ -148,7 +149,7 @@ func TestPkgerHTTPServer(t *testing.T) {
for _, tt := range tests {
fn := func(t *testing.T) {
svc := &fakeSVC{
DryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) {
dryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) {
if err := pkg.Validate(); err != nil {
return pkger.Summary{}, pkger.Diff{}, err
}
@ -202,7 +203,7 @@ func TestPkgerHTTPServer(t *testing.T) {
for _, tt := range tests {
fn := func(t *testing.T) {
svc := &fakeSVC{
DryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) {
dryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) {
if err := pkg.Validate(); err != nil {
return pkger.Summary{}, pkger.Diff{}, err
}
@ -304,7 +305,7 @@ func TestPkgerHTTPServer(t *testing.T) {
for _, tt := range tests {
fn := func(t *testing.T) {
svc := &fakeSVC{
DryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) {
dryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) {
if err := pkg.Validate(); err != nil {
return pkger.Summary{}, pkger.Diff{}, err
}
@ -344,7 +345,7 @@ func TestPkgerHTTPServer(t *testing.T) {
t.Run("apply a pkg", func(t *testing.T) {
svc := &fakeSVC{
DryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) {
dryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) {
if err := pkg.Validate(); err != nil {
return pkger.Summary{}, pkger.Diff{}, err
}
@ -357,7 +358,7 @@ func TestPkgerHTTPServer(t *testing.T) {
}
return sum, diff, nil
},
ApplyFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, error) {
applyFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, error) {
var opt pkger.ApplyOpt
for _, o := range opts {
require.NoError(t, o(&opt))
@ -391,6 +392,115 @@ func TestPkgerHTTPServer(t *testing.T) {
assert.Nil(t, resp.Errors)
})
})
t.Run("create a stack", func(t *testing.T) {
t.Run("should successfully return with valid req body", func(t *testing.T) {
svc := &fakeSVC{
initStack: func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) {
stack.ID = 3
stack.CreatedAt = time.Now()
stack.UpdatedAt = time.Now()
return stack, nil
},
}
pkgHandler := pkger.NewHTTPServer(zap.NewNop(), svc)
svr := newMountedHandler(pkgHandler, 1)
reqBody := pkger.ReqCreateStack{
OrgID: influxdb.ID(3).String(),
Name: "threeve",
Description: "desc",
URLs: []string{"http://example.com"},
}
testttp.
PostJSON(t, "/api/v2/packages/stacks", reqBody).
Headers("Content-Type", "application/json").
Do(svr).
ExpectStatus(http.StatusCreated).
ExpectBody(func(buf *bytes.Buffer) {
var resp pkger.RespCreateStack
decodeBody(t, buf, &resp)
assert.NotZero(t, resp.ID)
assert.Equal(t, reqBody.OrgID, resp.OrgID)
assert.Equal(t, reqBody.Name, resp.Name)
assert.Equal(t, reqBody.Description, resp.Description)
assert.Equal(t, reqBody.URLs, resp.URLs)
assert.NotZero(t, resp.CRUDLog)
})
})
t.Run("error cases", func(t *testing.T) {
tests := []struct {
name string
reqBody pkger.ReqCreateStack
expectedStatus int
svc pkger.SVC
}{
{
name: "bad org id",
reqBody: pkger.ReqCreateStack{
OrgID: "invalid id",
},
expectedStatus: http.StatusBadRequest,
},
{
name: "bad url",
reqBody: pkger.ReqCreateStack{
OrgID: influxdb.ID(3).String(),
URLs: []string{"invalid @% url"},
},
expectedStatus: http.StatusBadRequest,
},
{
name: "translates svc conflict error",
reqBody: pkger.ReqCreateStack{OrgID: influxdb.ID(3).String()},
svc: &fakeSVC{
initStack: func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) {
return pkger.Stack{}, &influxdb.Error{Code: influxdb.EConflict}
},
},
expectedStatus: http.StatusUnprocessableEntity,
},
{
name: "translates svc internal error",
reqBody: pkger.ReqCreateStack{OrgID: influxdb.ID(3).String()},
svc: &fakeSVC{
initStack: func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) {
return pkger.Stack{}, &influxdb.Error{Code: influxdb.EInternal}
},
},
expectedStatus: http.StatusInternalServerError,
},
}
for _, tt := range tests {
fn := func(t *testing.T) {
svc := tt.svc
if svc == nil {
svc = &fakeSVC{
initStack: func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) {
return stack, nil
},
}
}
pkgHandler := pkger.NewHTTPServer(zap.NewNop(), svc)
svr := newMountedHandler(pkgHandler, 1)
testttp.
PostJSON(t, "/api/v2/packages/stacks", tt.reqBody).
Headers("Content-Type", "application/json").
Do(svr).
ExpectStatus(tt.expectedStatus)
}
t.Run(tt.name, fn)
}
})
})
}
func bucketPkgKinds(t *testing.T, encoding pkger.Encoding) []byte {
@ -471,14 +581,18 @@ func decodeBody(t *testing.T, r io.Reader, v interface{}) {
}
type fakeSVC struct {
DryRunFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error)
ApplyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, error)
initStack func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error)
dryRunFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error)
applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, error)
}
var _ pkger.SVC = (*fakeSVC)(nil)
func (f *fakeSVC) InitStack(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) {
panic("not implemented")
if f.initStack == nil {
panic("not implemented")
}
return f.initStack(ctx, userID, stack)
}
func (f *fakeSVC) CreatePkg(ctx context.Context, setters ...pkger.CreatePkgSetFn) (*pkger.Pkg, error) {
@ -486,18 +600,18 @@ func (f *fakeSVC) CreatePkg(ctx context.Context, setters ...pkger.CreatePkgSetFn
}
func (f *fakeSVC) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) {
if f.DryRunFn == nil {
if f.dryRunFn == nil {
panic("not implemented")
}
return f.DryRunFn(ctx, orgID, userID, pkg, opts...)
return f.dryRunFn(ctx, orgID, userID, pkg, opts...)
}
func (f *fakeSVC) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, error) {
if f.ApplyFn == nil {
if f.applyFn == nil {
panic("not implemented")
}
return f.ApplyFn(ctx, orgID, userID, pkg, opts...)
return f.applyFn(ctx, orgID, userID, pkg, opts...)
}
func newMountedHandler(rh kithttp.ResourceHandler, userID influxdb.ID) chi.Router {

View File

@ -12,6 +12,7 @@ import (
"github.com/influxdata/influxdb"
ierrors "github.com/influxdata/influxdb/kit/errors"
"github.com/influxdata/influxdb/snowflake"
"go.uber.org/zap"
)
@ -226,6 +227,8 @@ func NewService(opts ...ServiceSetterFn) *Service {
opt := &serviceOpt{
logger: zap.NewNop(),
applyReqLimit: 5,
idGen: snowflake.NewDefaultIDGenerator(),
timeGen: influxdb.RealTimeGenerator{},
}
for _, o := range opts {
o(opt)

View File

@ -43,7 +43,7 @@ func (s *loggingMW) InitStack(ctx context.Context, userID influxdb.ID, newStack
return
}
}(time.Now())
return s.next.InitStack(ctx, userID, stack)
return s.next.InitStack(ctx, userID, newStack)
}
func (s *loggingMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (pkg *Pkg, err error) {