feat(pkger): extend stacks API with uninstall ability

this new ability removes the resources associated with the stack
and adds the uninstall event.
pull/18912/head
Johnny Steenbergen 2020-07-09 16:33:42 -07:00 committed by Johnny Steenbergen
parent 8e922ed14e
commit 944b097628
13 changed files with 294 additions and 54 deletions

View File

@ -3,6 +3,7 @@
### Features
1. [18888](https://github.com/influxdata/influxdb/pull/18888): Add event source to influx stack operations
1. [18910](https://github.com/influxdata/influxdb/pull/18910): Add uninstall functionality for stacks
### Bug Fixes

View File

@ -1780,7 +1780,7 @@ func writeStackRows(tabW *internal.TabWriter, stacks ...pkger.Stack) {
tabW.Write(map[string]interface{}{
"ID": stack.ID,
"OrgID": stack.OrgID,
"Active": latest.EventType != pkger.StackEventDelete,
"Active": latest.EventType != pkger.StackEventUninstalled,
"Name": latest.Name,
"Description": latest.Description,
"Num Resources": len(latest.Resources),

View File

@ -795,6 +795,10 @@ func (f *fakePkgSVC) ListStacks(ctx context.Context, orgID influxdb.ID, filter p
panic("not implemented")
}
func (f *fakePkgSVC) UninstallStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) (pkger.Stack, error) {
panic("not implemeted")
}
func (f *fakePkgSVC) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error {
panic("not implemented")
}

View File

@ -250,6 +250,55 @@ func TestLauncher_Pkger(t *testing.T) {
return obj
}
validateAllResourcesRemoved := func(t *testing.T, summary pkger.Summary) {
t.Helper()
for _, b := range summary.Buckets {
_, err := resourceCheck.getBucket(t, bySafeID(b.ID))
assertErrorCode(t, influxdb.ENotFound, err)
}
for _, c := range summary.Checks {
_, err := resourceCheck.getCheck(t, byID(c.Check.GetID()))
assert.Error(t, err)
}
for _, d := range summary.Dashboards {
_, err := resourceCheck.getDashboard(t, bySafeID(d.ID))
assertErrorCode(t, influxdb.ENotFound, err)
}
for _, l := range summary.Labels {
_, err := resourceCheck.getLabel(t, bySafeID(l.ID))
assertErrorCode(t, influxdb.ENotFound, err)
}
for _, e := range summary.NotificationEndpoints {
_, err := resourceCheck.getEndpoint(t, byID(e.NotificationEndpoint.GetID()))
assert.Error(t, err)
}
for _, r := range summary.NotificationRules {
_, err := resourceCheck.getRule(t, bySafeID(r.ID))
assert.Error(t, err)
}
for _, ta := range summary.Tasks {
_, err := resourceCheck.getTask(t, bySafeID(ta.ID))
assert.Error(t, err)
}
for _, te := range summary.TelegrafConfigs {
_, err := resourceCheck.getTelegrafConfig(t, byID(te.TelegrafConfig.ID))
assert.Error(t, err)
}
for _, v := range summary.Variables {
_, err := resourceCheck.getVariable(t, bySafeID(v.ID))
assertErrorCode(t, influxdb.ENotFound, err)
}
}
t.Run("managing pkg lifecycle with stacks", func(t *testing.T) {
t.Run("list stacks", func(t *testing.T) {
stacks, err := svc.ListStacks(ctx, l.Org.ID, pkger.ListFilter{})
@ -313,6 +362,69 @@ func TestLauncher_Pkger(t *testing.T) {
cleanup()
})
t.Run("uninstall a stack", func(t *testing.T) {
t.Run("should remove all resources associated with it", func(t *testing.T) {
newStack, cleanup := newStackFn(t, pkger.StackCreate{})
defer cleanup()
newEndpointPkgName := "non-existent-endpoint"
allResourcesPkg := newTemplate(
newBucketObject("non-existent-bucket", "", ""),
newCheckDeadmanObject(t, "non-existent-check", "", time.Minute),
newDashObject("non-existent-dash", "", ""),
newEndpointHTTP(newEndpointPkgName, "", ""),
newLabelObject("non-existent-label", "", "", ""),
newRuleObject(t, "non-existent-rule", "", newEndpointPkgName, ""),
newTaskObject("non-existent-task", "", ""),
newTelegrafObject("non-existent-tele", "", ""),
newVariableObject("non-existent-var", "", ""),
)
impact, err := svc.Apply(ctx, l.Org.ID, l.User.ID,
pkger.ApplyWithTemplate(allResourcesPkg),
pkger.ApplyWithStackID(newStack.ID),
)
require.NoError(t, err)
sum := impact.Summary
require.Len(t, sum.Buckets, 1)
assert.NotZero(t, sum.Buckets[0].ID)
require.Len(t, sum.Checks, 1)
assert.NotZero(t, sum.Checks[0].Check.GetID())
require.Len(t, sum.Dashboards, 1)
assert.NotZero(t, sum.Dashboards[0].ID)
require.Len(t, sum.Labels, 1)
assert.NotZero(t, sum.Labels[0].ID)
require.Len(t, sum.NotificationEndpoints, 1)
assert.NotZero(t, sum.NotificationEndpoints[0].NotificationEndpoint.GetID())
require.Len(t, sum.NotificationRules, 1)
assert.NotZero(t, sum.NotificationRules[0].ID)
require.Len(t, sum.Tasks, 1)
assert.NotZero(t, sum.Tasks[0].ID)
require.Len(t, sum.TelegrafConfigs, 1)
assert.NotZero(t, sum.TelegrafConfigs[0].TelegrafConfig.ID)
require.Len(t, sum.Variables, 1)
assert.NotZero(t, sum.Variables[0].ID)
_, err = svc.UninstallStack(ctx, struct{ OrgID, UserID, StackID influxdb.ID }{
OrgID: l.Org.ID,
UserID: l.User.ID,
StackID: newStack.ID,
})
require.NoError(t, err)
matchingStack, err := svc.ReadStack(ctx, newStack.ID)
require.NoError(t, err)
ev := matchingStack.LatestEvent()
assert.Equal(t, pkger.StackEventUninstalled, ev.EventType)
assert.Empty(t, ev.Resources)
validateAllResourcesRemoved(t, sum)
})
})
t.Run("delete a stack", func(t *testing.T) {
t.Run("should delete the stack and all resources associated with it", func(t *testing.T) {
newStack, cleanup := newStackFn(t, pkger.StackCreate{})
@ -358,42 +470,14 @@ func TestLauncher_Pkger(t *testing.T) {
require.Len(t, sum.Variables, 1)
assert.NotZero(t, sum.Variables[0].ID)
deleteStackFn(t, newStack.ID)
matchingStack, err := svc.ReadStack(ctx, newStack.ID)
err = svc.DeleteStack(ctx, struct{ OrgID, UserID, StackID influxdb.ID }{
OrgID: l.Org.ID,
UserID: l.User.ID,
StackID: newStack.ID,
})
require.NoError(t, err)
ev := matchingStack.LatestEvent()
assert.Equal(t, pkger.StackEventDelete, ev.EventType)
assert.Empty(t, ev.Resources)
assert.Empty(t, ev.Sources)
_, err = resourceCheck.getBucket(t, byID(influxdb.ID(sum.Buckets[0].ID)))
assert.Error(t, err)
_, err = resourceCheck.getCheck(t, byID(sum.Checks[0].Check.GetID()))
assert.Error(t, err)
_, err = resourceCheck.getDashboard(t, byID(influxdb.ID(sum.Dashboards[0].ID)))
assert.Error(t, err)
_, err = resourceCheck.getLabel(t, byID(influxdb.ID(sum.Labels[0].ID)))
assert.Error(t, err)
_, err = resourceCheck.getEndpoint(t, byID(sum.NotificationEndpoints[0].NotificationEndpoint.GetID()))
assert.Error(t, err)
_, err = resourceCheck.getRule(t, byID(influxdb.ID(sum.NotificationRules[0].ID)))
assert.Error(t, err)
_, err = resourceCheck.getTask(t, byID(influxdb.ID(sum.Tasks[0].ID)))
assert.Error(t, err)
_, err = resourceCheck.getTelegrafConfig(t, byID(sum.TelegrafConfigs[0].TelegrafConfig.ID))
assert.Error(t, err)
_, err = resourceCheck.getVariable(t, byID(influxdb.ID(sum.Variables[0].ID)))
assert.Error(t, err)
validateAllResourcesRemoved(t, sum)
})
t.Run("that has already been deleted should be successful", func(t *testing.T) {
@ -2178,6 +2262,7 @@ func TestLauncher_Pkger(t *testing.T) {
t.Run("errors incurred during application of package rolls back to state before package", func(t *testing.T) {
stacks, err := svc.ListStacks(ctx, l.Org.ID, pkger.ListFilter{})
require.NoError(t, err)
require.Empty(t, stacks)
svc := pkger.NewService(
pkger.WithBucketSVC(l.BucketService(t)),
@ -2249,7 +2334,7 @@ func TestLauncher_Pkger(t *testing.T) {
afterStacks, err := svc.ListStacks(ctx, l.Org.ID, pkger.ListFilter{})
require.NoError(t, err)
require.Len(t, afterStacks, len(stacks))
require.Empty(t, afterStacks)
})
hasLabelAssociations := func(t *testing.T, associations []pkger.SummaryLabel, numAss int, expectedNames ...string) {
@ -3979,6 +4064,12 @@ func (f *fakeRuleStore) CreateNotificationRule(ctx context.Context, nr influxdb.
return f.NotificationRuleStore.CreateNotificationRule(ctx, nr, userID)
}
func assertErrorCode(t *testing.T, expected string, err error) {
t.Helper()
assert.Error(t, err)
assert.Equal(t, expected, influxdb.ErrorCode(err))
}
type resourceChecker struct {
tl *TestLauncher
}
@ -4008,6 +4099,12 @@ func byName(name string) getResourceOptFn {
}
}
func bySafeID(id pkger.SafeID) getResourceOptFn {
return func() getResourceOpt {
return getResourceOpt{id: influxdb.ID(id)}
}
}
func (r resourceChecker) getBucket(t *testing.T, getOpt getResourceOptFn) (influxdb.Bucket, error) {
t.Helper()

View File

@ -4725,7 +4725,7 @@ paths:
operationId: UpdateStack
tags:
- InfluxDB Templates
summary: Update a an InfluxDB Stack
summary: Update an InfluxDB Stack
parameters:
- in: path
name: stack_id
@ -4801,6 +4801,32 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
/stacks/{stack_id}/uninstall:
post:
operationId: UninstallStack
tags:
- InfluxDB Templates
summary: Uninstall an InfluxDB Stack
parameters:
- in: path
name: stack_id
required: true
schema:
type: string
description: The stack id
responses:
"200":
description: Influx stack uninstalled
content:
application/json:
schema:
$ref: "#/components/schemas/Stack"
default:
description: Unexpected error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/templates/apply:
post:
operationId: ApplyTemplate

View File

@ -37,6 +37,20 @@ func (s *HTTPRemoteService) InitStack(ctx context.Context, userID influxdb.ID, s
return convertRespStackToStack(respBody)
}
func (s *HTTPRemoteService) UninstallStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) (Stack, error) {
var respBody RespStack
err := s.Client.
Post(httpc.BodyEmpty, RoutePrefixStacks, identifiers.StackID.String(), "/uninstall").
QueryParams([2]string{"orgID", identifiers.OrgID.String()}).
DecodeJSON(&respBody).
Do(ctx)
if err != nil {
return Stack{}, err
}
return convertRespStackToStack(respBody)
}
func (s *HTTPRemoteService) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error {
return s.Client.
Delete(RoutePrefixStacks, identifiers.StackID.String()).
@ -290,8 +304,8 @@ func convertRespStackEvent(ev RespStackEvent) (StackEvent, error) {
eventType := StackEventCreate
switch ev.EventType {
case "delete":
eventType = StackEventDelete
case "uninstall", "delete": // delete is included to maintain backwards compatibility
eventType = StackEventUninstalled
case "update":
eventType = StackEventUpdate
}

View File

@ -1350,6 +1350,10 @@ func (f *fakeSVC) InitStack(ctx context.Context, userID influxdb.ID, stack pkger
return f.initStackFn(ctx, userID, stack)
}
func (f *fakeSVC) UninstallStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) (pkger.Stack, error) {
panic("not implemented")
}
func (f *fakeSVC) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error {
panic("not implemented yet")
}

View File

@ -40,6 +40,7 @@ func NewHTTPServerStacks(log *zap.Logger, svc SVC) *HTTPServerStacks {
r.Get("/", svr.readStack)
r.Delete("/", svr.deleteStack)
r.Patch("/", svr.updateStack)
r.Post("/uninstall", svr.uninstallStack)
})
}
@ -244,11 +245,10 @@ func (s *HTTPServerStacks) deleteStack(w http.ResponseWriter, r *http.Request) {
s.api.Err(w, r, err)
return
}
userID := auth.GetUserID()
err = s.svc.DeleteStack(r.Context(), struct{ OrgID, UserID, StackID influxdb.ID }{
OrgID: orgID,
UserID: userID,
UserID: auth.GetUserID(),
StackID: stackID,
})
if err != nil {
@ -259,6 +259,38 @@ func (s *HTTPServerStacks) deleteStack(w http.ResponseWriter, r *http.Request) {
s.api.Respond(w, r, http.StatusNoContent, nil)
}
func (s *HTTPServerStacks) uninstallStack(w http.ResponseWriter, r *http.Request) {
orgID, err := getRequiredOrgIDFromQuery(r.URL.Query())
if err != nil {
s.api.Err(w, r, err)
return
}
stackID, err := stackIDFromReq(r)
if err != nil {
s.api.Err(w, r, err)
return
}
auth, err := pctx.GetAuthorizer(r.Context())
if err != nil {
s.api.Err(w, r, err)
return
}
stack, err := s.svc.UninstallStack(r.Context(), struct{ OrgID, UserID, StackID influxdb.ID }{
OrgID: orgID,
UserID: auth.GetUserID(),
StackID: stackID,
})
if err != nil {
s.api.Err(w, r, err)
return
}
s.api.Respond(w, r, http.StatusOK, convertStackToRespStack(stack))
}
func (s *HTTPServerStacks) readStack(w http.ResponseWriter, r *http.Request) {
stackID, err := stackIDFromReq(r)
if err != nil {

View File

@ -104,15 +104,15 @@ type StackEventType uint
const (
StackEventCreate StackEventType = iota
StackEventUpdate
StackEventDelete
StackEventUninstalled
)
func (e StackEventType) String() string {
switch e {
case StackEventCreate:
return "create"
case StackEventDelete:
return "delete"
case StackEventUninstalled:
return "uninstall"
case StackEventUpdate:
return "update"
default:
@ -125,6 +125,7 @@ const ResourceTypeStack influxdb.ResourceType = "stack"
// SVC is the packages service interface.
type SVC interface {
InitStack(ctx context.Context, userID influxdb.ID, stack StackCreate) (Stack, error)
UninstallStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) (Stack, error)
DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error
ListStacks(ctx context.Context, orgID influxdb.ID, filter ListFilter) ([]Stack, error)
ReadStack(ctx context.Context, id influxdb.ID) (Stack, error)
@ -386,17 +387,45 @@ func (s *Service) InitStack(ctx context.Context, userID influxdb.ID, stCreate St
return newStack, nil
}
// UninstallStack will remove all resources associated with the stack.
func (s *Service) UninstallStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) (Stack, error) {
uninstalledStack, err := s.uninstallStack(ctx, identifiers)
if err != nil {
return Stack{}, err
}
ev := uninstalledStack.LatestEvent()
ev.EventType = StackEventUninstalled
ev.Resources = nil
ev.UpdatedAt = s.timeGen.Now()
uninstalledStack.Events = append(uninstalledStack.Events, ev)
if err := s.store.UpdateStack(ctx, uninstalledStack); err != nil {
s.log.Error("unable to update stack after uninstalling resources", zap.Error(err))
}
return uninstalledStack, nil
}
// DeleteStack removes a stack and all the resources that have are associated with the stack.
func (s *Service) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) (e error) {
stack, err := s.store.ReadStackByID(ctx, identifiers.StackID)
deletedStack, err := s.uninstallStack(ctx, identifiers)
if influxdb.ErrorCode(err) == influxdb.ENotFound {
return nil
}
if err != nil {
return err
}
return s.store.DeleteStack(ctx, deletedStack.ID)
}
func (s *Service) uninstallStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) (_ Stack, e error) {
stack, err := s.store.ReadStackByID(ctx, identifiers.StackID)
if err != nil {
return Stack{}, err
}
if stack.OrgID != identifiers.OrgID {
return &influxdb.Error{
return Stack{}, &influxdb.Error{
Code: influxdb.EConflict,
Msg: "you do not have access to given stack ID",
}
@ -405,7 +434,7 @@ func (s *Service) DeleteStack(ctx context.Context, identifiers struct{ OrgID, Us
// providing empty template will remove all applied resources
state, err := s.dryRun(ctx, identifiers.OrgID, new(Template), applyOptFromOptFns(ApplyWithStackID(identifiers.StackID)))
if err != nil {
return err
return Stack{}, err
}
coordinator := newRollbackCoordinator(s.log, s.applyReqLimit)
@ -413,14 +442,9 @@ func (s *Service) DeleteStack(ctx context.Context, identifiers struct{ OrgID, Us
err = s.applyState(ctx, coordinator, identifiers.OrgID, identifiers.UserID, state, nil)
if err != nil {
return err
return Stack{}, err
}
stack.Events = append(stack.Events, StackEvent{
EventType: StackEventDelete,
UpdatedAt: s.timeGen.Now(),
})
return s.store.UpdateStack(ctx, stack)
return stack, nil
}
// ListFilter are filter options for filtering stacks from being returned.

View File

@ -36,6 +36,14 @@ func (s *authMW) InitStack(ctx context.Context, userID influxdb.ID, newStack Sta
return s.next.InitStack(ctx, userID, newStack)
}
func (s *authMW) UninstallStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) (Stack, error) {
err := s.authAgent.IsWritable(ctx, identifiers.OrgID, ResourceTypeStack)
if err != nil {
return Stack{}, err
}
return s.next.UninstallStack(ctx, identifiers)
}
func (s *authMW) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error {
err := s.authAgent.IsWritable(ctx, identifiers.OrgID, ResourceTypeStack)
if err != nil {

View File

@ -43,6 +43,24 @@ func (s *loggingMW) InitStack(ctx context.Context, userID influxdb.ID, newStack
return s.next.InitStack(ctx, userID, newStack)
}
func (s *loggingMW) UninstallStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) (_ Stack, err error) {
defer func(start time.Time) {
if err == nil {
return
}
s.logger.Error(
"failed to uninstall stack",
zap.Error(err),
zap.Stringer("orgID", identifiers.OrgID),
zap.Stringer("userID", identifiers.OrgID),
zap.Stringer("stackID", identifiers.StackID),
zap.Duration("took", time.Since(start)),
)
}(time.Now())
return s.next.UninstallStack(ctx, identifiers)
}
func (s *loggingMW) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) (err error) {
defer func(start time.Time) {
if err == nil {

View File

@ -38,6 +38,12 @@ func (s *mwMetrics) InitStack(ctx context.Context, userID influxdb.ID, newStack
return stack, rec(err)
}
func (s *mwMetrics) UninstallStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) (Stack, error) {
rec := s.rec.Record("uninstall_stack")
stack, err := s.next.UninstallStack(ctx, identifiers)
return stack, rec(err)
}
func (s *mwMetrics) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error {
rec := s.rec.Record("delete_stack")
return rec(s.next.DeleteStack(ctx, identifiers))

View File

@ -27,6 +27,12 @@ func (s *traceMW) InitStack(ctx context.Context, userID influxdb.ID, newStack St
return s.next.InitStack(ctx, userID, newStack)
}
func (s *traceMW) UninstallStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) (Stack, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
return s.next.UninstallStack(ctx, identifiers)
}
func (s *traceMW) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()