feat(pkger): add the ability to remove a stack and all its associated resources
closes: #17554pull/17898/head
parent
aac3a4ee81
commit
89e34365bb
|
|
@ -2,6 +2,8 @@
|
|||
|
||||
### Features
|
||||
|
||||
1. [17934](https://github.com/influxdata/influxdb/pull/17934): Add ability to delete a stack and all the resources associated with it
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
1. [17906](https://github.com/influxdata/influxdb/pull/17906): Ensure UpdateUser cleans up the index when updating names
|
||||
|
|
|
|||
|
|
@ -708,6 +708,8 @@ type fakePkgSVC struct {
|
|||
applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error)
|
||||
}
|
||||
|
||||
var _ pkger.SVC = (*fakePkgSVC)(nil)
|
||||
|
||||
func (f *fakePkgSVC) InitStack(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) {
|
||||
if f.initStackFn != nil {
|
||||
return f.initStackFn(ctx, userID, stack)
|
||||
|
|
@ -719,6 +721,10 @@ func (f *fakePkgSVC) ListStacks(ctx context.Context, orgID influxdb.ID, filter p
|
|||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (f *fakePkgSVC) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (f *fakePkgSVC) CreatePkg(ctx context.Context, setters ...pkger.CreatePkgSetFn) (*pkger.Pkg, error) {
|
||||
if f.createFn != nil {
|
||||
return f.createFn(ctx, setters...)
|
||||
|
|
|
|||
|
|
@ -229,6 +229,122 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
})
|
||||
})
|
||||
|
||||
t.Run("delete a stack", func(t *testing.T) {
|
||||
t.Run("should delete the stack and all resources associated with it", func(t *testing.T) {
|
||||
newStack, err := svc.InitStack(ctx, l.User.ID, pkger.Stack{
|
||||
OrgID: l.Org.ID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
newEndpointPkgName := "non_existent_endpoint"
|
||||
allResourcesPkg := newPkg(
|
||||
newBucketObject("non_existent_bucket", "", ""),
|
||||
newCheckDeadmanObject(t, "non_existent_check", "", time.Minute),
|
||||
newDashObject("non_existent_dash", "", ""),
|
||||
newEndpointHTTP(newEndpointPkgName, "", ""),
|
||||
newLabelObject("non_existent_label", "", "", ""),
|
||||
newRuleObject(t, "non_existent_rule", "", newEndpointPkgName, ""),
|
||||
newTaskObject("non_existent_task", "", ""),
|
||||
newTelegrafObject("non_existent_tele", "", ""),
|
||||
newVariableObject("non_existent_var", "", ""),
|
||||
)
|
||||
|
||||
sum, _, err := svc.Apply(ctx, l.Org.ID, l.User.ID, allResourcesPkg, pkger.ApplyWithStackID(newStack.ID))
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, sum.Buckets, 1)
|
||||
assert.NotZero(t, sum.Buckets[0].ID)
|
||||
require.Len(t, sum.Checks, 1)
|
||||
assert.NotZero(t, sum.Checks[0].Check.GetID())
|
||||
require.Len(t, sum.Dashboards, 1)
|
||||
assert.NotZero(t, sum.Dashboards[0].ID)
|
||||
require.Len(t, sum.Labels, 1)
|
||||
assert.NotZero(t, sum.Labels[0].ID)
|
||||
require.Len(t, sum.NotificationEndpoints, 1)
|
||||
assert.NotZero(t, sum.NotificationEndpoints[0].NotificationEndpoint.GetID())
|
||||
require.Len(t, sum.NotificationRules, 1)
|
||||
assert.NotZero(t, sum.NotificationRules[0].ID)
|
||||
require.Len(t, sum.Tasks, 1)
|
||||
assert.NotZero(t, sum.Tasks[0].ID)
|
||||
require.Len(t, sum.TelegrafConfigs, 1)
|
||||
assert.NotZero(t, sum.TelegrafConfigs[0].TelegrafConfig.ID)
|
||||
require.Len(t, sum.Variables, 1)
|
||||
assert.NotZero(t, sum.Variables[0].ID)
|
||||
|
||||
err = svc.DeleteStack(ctx, struct{ OrgID, UserID, StackID influxdb.ID }{
|
||||
OrgID: l.Org.ID,
|
||||
UserID: l.User.ID,
|
||||
StackID: newStack.ID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
matchingStacks, err := svc.ListStacks(ctx, l.Org.ID, pkger.ListFilter{
|
||||
StackIDs: []influxdb.ID{newStack.ID},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, matchingStacks)
|
||||
|
||||
_, err = resourceCheck.getBucket(t, byID(influxdb.ID(sum.Buckets[0].ID)))
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = resourceCheck.getCheck(t, byID(sum.Checks[0].Check.GetID()))
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = resourceCheck.getDashboard(t, byID(influxdb.ID(sum.Dashboards[0].ID)))
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = resourceCheck.getLabel(t, byID(influxdb.ID(sum.Labels[0].ID)))
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = resourceCheck.getEndpoint(t, byID(sum.NotificationEndpoints[0].NotificationEndpoint.GetID()))
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = resourceCheck.getRule(t, byID(influxdb.ID(sum.NotificationRules[0].ID)))
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = resourceCheck.getTask(t, byID(influxdb.ID(sum.Tasks[0].ID)))
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = resourceCheck.getTelegrafConfig(t, byID(sum.TelegrafConfigs[0].TelegrafConfig.ID))
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = resourceCheck.getVariable(t, byID(influxdb.ID(sum.Variables[0].ID)))
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("that has been deleted should be successful", func(t *testing.T) {
|
||||
newStack, err := svc.InitStack(ctx, l.User.ID, pkger.Stack{
|
||||
OrgID: l.Org.ID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = svc.DeleteStack(ctx, struct{ OrgID, UserID, StackID influxdb.ID }{
|
||||
OrgID: l.Org.ID,
|
||||
UserID: l.User.ID,
|
||||
StackID: newStack.ID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// delete same stack
|
||||
err = svc.DeleteStack(ctx, struct{ OrgID, UserID, StackID influxdb.ID }{
|
||||
OrgID: l.Org.ID,
|
||||
UserID: l.User.ID,
|
||||
StackID: newStack.ID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("that doesn't exist should be successful", func(t *testing.T) {
|
||||
// delete stack that doesn't exist
|
||||
err := svc.DeleteStack(ctx, struct{ OrgID, UserID, StackID influxdb.ID }{
|
||||
OrgID: l.Org.ID,
|
||||
UserID: l.User.ID,
|
||||
StackID: 9000,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("apply with only a stackID succeeds when stack has URLs", func(t *testing.T) {
|
||||
svr := httptest.NewServer(nethttp.HandlerFunc(func(w nethttp.ResponseWriter, r *nethttp.Request) {
|
||||
pkg := newPkg(newBucketObject("bucket_0", "", ""))
|
||||
|
|
@ -2296,6 +2412,12 @@ type (
|
|||
getResourceOptFn func() getResourceOpt
|
||||
)
|
||||
|
||||
func byID(id influxdb.ID) getResourceOptFn {
|
||||
return func() getResourceOpt {
|
||||
return getResourceOpt{id: id}
|
||||
}
|
||||
}
|
||||
|
||||
func byName(name string) getResourceOptFn {
|
||||
return func() getResourceOpt {
|
||||
return getResourceOpt{name: name}
|
||||
|
|
@ -2507,8 +2629,11 @@ func (r resourceChecker) getLabel(t *testing.T, getOpt getResourceOptFn) (influx
|
|||
default:
|
||||
require.Fail(t, "did not provide any get option")
|
||||
}
|
||||
if err != nil {
|
||||
return influxdb.Label{}, err
|
||||
}
|
||||
|
||||
return *label, err
|
||||
return *label, nil
|
||||
}
|
||||
|
||||
func (r resourceChecker) mustGetLabel(t *testing.T, getOpt getResourceOptFn) influxdb.Label {
|
||||
|
|
@ -2700,8 +2825,10 @@ func (r resourceChecker) getVariable(t *testing.T, getOpt getResourceOptFn) (inf
|
|||
default:
|
||||
require.Fail(t, "did not provide any get option")
|
||||
}
|
||||
|
||||
return *variable, err
|
||||
if err != nil {
|
||||
return influxdb.Variable{}, err
|
||||
}
|
||||
return *variable, nil
|
||||
}
|
||||
|
||||
func (r resourceChecker) mustGetVariable(t *testing.T, getOpt getResourceOptFn) influxdb.Variable {
|
||||
|
|
|
|||
|
|
@ -4510,6 +4510,34 @@ paths:
|
|||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
/packages/stacks/{stack_id}:
|
||||
delete:
|
||||
operationId: DeleteStack
|
||||
tags:
|
||||
- InfluxPackages
|
||||
summary: Delete a stack and remove all its associated resources
|
||||
parameters:
|
||||
- in: path
|
||||
name: stack_id
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
description: The stack id to be removed
|
||||
- in: query
|
||||
name: orgID
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
description: The organization id of the user
|
||||
responses:
|
||||
'204':
|
||||
description: Stack and all its associated resources are deleted
|
||||
default:
|
||||
description: Unexpected error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
/tasks:
|
||||
get:
|
||||
operationId: GetTasks
|
||||
|
|
|
|||
|
|
@ -55,6 +55,13 @@ func (s *HTTPRemoteService) InitStack(ctx context.Context, userID influxdb.ID, s
|
|||
return newStack, nil
|
||||
}
|
||||
|
||||
func (s *HTTPRemoteService) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error {
|
||||
return s.Client.
|
||||
Delete(RoutePrefix, "stacks", identifiers.StackID.String()).
|
||||
QueryParams([2]string{"orgID", identifiers.OrgID.String()}).
|
||||
Do(ctx)
|
||||
}
|
||||
|
||||
func (s *HTTPRemoteService) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
|
||||
queryParams := [][2]string{{"orgID", orgID.String()}}
|
||||
for _, name := range f.Names {
|
||||
|
|
|
|||
|
|
@ -48,11 +48,14 @@ func NewHTTPServer(log *zap.Logger, svc SVC) *HTTPServer {
|
|||
{
|
||||
r.With(middleware.AllowContentType("text/yml", "application/x-yaml", "application/json")).
|
||||
Post("/", svr.createPkg)
|
||||
|
||||
r.With(middleware.SetHeader("Content-Type", "application/json; charset=utf-8")).
|
||||
Post("/apply", svr.applyPkg)
|
||||
|
||||
r.Route("/stacks", func(r chi.Router) {
|
||||
r.Post("/", svr.createStack)
|
||||
r.Get("/", svr.listStacks)
|
||||
r.Delete("/{stack_id}", svr.deleteStack)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -203,6 +206,63 @@ func (s *HTTPServer) createStack(w http.ResponseWriter, r *http.Request) {
|
|||
})
|
||||
}
|
||||
|
||||
func (s *HTTPServer) deleteStack(w http.ResponseWriter, r *http.Request) {
|
||||
orgID, err := getRequiredOrgIDFromQuery(r.URL.Query())
|
||||
if err != nil {
|
||||
s.api.Err(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
stackID, err := influxdb.IDFromString(chi.URLParam(r, "stack_id"))
|
||||
if err != nil {
|
||||
s.api.Err(w, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: "the stack id provided in the path was invalid",
|
||||
Err: err,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
auth, err := pctx.GetAuthorizer(r.Context())
|
||||
if err != nil {
|
||||
s.api.Err(w, err)
|
||||
return
|
||||
}
|
||||
userID := auth.GetUserID()
|
||||
|
||||
err = s.svc.DeleteStack(r.Context(), struct{ OrgID, UserID, StackID influxdb.ID }{
|
||||
OrgID: orgID,
|
||||
UserID: userID,
|
||||
StackID: *stackID,
|
||||
})
|
||||
if err != nil {
|
||||
s.api.Err(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
s.api.Respond(w, http.StatusNoContent, nil)
|
||||
}
|
||||
|
||||
func getRequiredOrgIDFromQuery(q url.Values) (influxdb.ID, error) {
|
||||
orgIDRaw := q.Get("orgID")
|
||||
if orgIDRaw == "" {
|
||||
return 0, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: "the orgID query param is required",
|
||||
}
|
||||
}
|
||||
|
||||
orgID, err := influxdb.IDFromString(orgIDRaw)
|
||||
if err != nil {
|
||||
return 0, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: "the orgID query param was invalid",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
return *orgID, nil
|
||||
}
|
||||
|
||||
// ReqCreateOrgIDOpt provides options to export resources by organization id.
|
||||
type ReqCreateOrgIDOpt struct {
|
||||
OrgID string `json:"orgID"`
|
||||
|
|
|
|||
|
|
@ -837,6 +837,10 @@ func (f *fakeSVC) InitStack(ctx context.Context, userID influxdb.ID, stack pkger
|
|||
return f.initStack(ctx, userID, stack)
|
||||
}
|
||||
|
||||
func (f *fakeSVC) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error {
|
||||
panic("not implemented yet")
|
||||
}
|
||||
|
||||
func (f *fakeSVC) ListStacks(ctx context.Context, orgID influxdb.ID, filter pkger.ListFilter) ([]pkger.Stack, error) {
|
||||
if f.listStacksFn == nil {
|
||||
panic("not implemented")
|
||||
|
|
|
|||
|
|
@ -59,6 +59,7 @@ const ResourceTypeStack influxdb.ResourceType = "stack"
|
|||
// SVC is the packages service interface.
|
||||
type SVC interface {
|
||||
InitStack(ctx context.Context, userID influxdb.ID, stack Stack) (Stack, error)
|
||||
DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error
|
||||
ListStacks(ctx context.Context, orgID influxdb.ID, filter ListFilter) ([]Stack, error)
|
||||
|
||||
CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error)
|
||||
|
|
@ -298,6 +299,39 @@ func (s *Service) InitStack(ctx context.Context, userID influxdb.ID, stack Stack
|
|||
return stack, nil
|
||||
}
|
||||
|
||||
// DeleteStack removes a stack and all the resources that have are associated with the stack.
|
||||
func (s *Service) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) (e error) {
|
||||
stack, err := s.store.ReadStackByID(ctx, identifiers.StackID)
|
||||
if influxdb.ErrorCode(err) == influxdb.ENotFound {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if stack.OrgID != identifiers.OrgID {
|
||||
return &influxdb.Error{
|
||||
Code: influxdb.EConflict,
|
||||
Msg: "you do not have access to given stack ID",
|
||||
}
|
||||
}
|
||||
|
||||
// providing empty Pkg will remove all applied resources
|
||||
state, err := s.dryRun(ctx, identifiers.OrgID, new(Pkg), applyOptFromOptFns(ApplyWithStackID(identifiers.StackID)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
coordinator := &rollbackCoordinator{sem: make(chan struct{}, s.applyReqLimit)}
|
||||
defer coordinator.rollback(s.log, &e, identifiers.OrgID)
|
||||
|
||||
err = s.applyState(ctx, coordinator, identifiers.OrgID, identifiers.UserID, state, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.store.DeleteStack(ctx, identifiers.StackID)
|
||||
}
|
||||
|
||||
// ListFilter are filter options for filtering stacks from being returned.
|
||||
type ListFilter struct {
|
||||
StackIDs []influxdb.ID
|
||||
|
|
@ -685,7 +719,7 @@ func (s *Service) dryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg, opt A
|
|||
// will be skipped, and won't bleed into the dry run here. We can now return
|
||||
// a error (parseErr) and valid diff/summary.
|
||||
var parseErr error
|
||||
err := pkg.Validate()
|
||||
err := pkg.Validate(ValidWithoutResources())
|
||||
if err != nil && !IsParseErr(err) {
|
||||
return nil, internalErr(err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,6 +36,14 @@ func (s *authMW) InitStack(ctx context.Context, userID influxdb.ID, newStack Sta
|
|||
return s.next.InitStack(ctx, userID, newStack)
|
||||
}
|
||||
|
||||
func (s *authMW) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error {
|
||||
err := s.authAgent.IsWritable(ctx, identifiers.OrgID, ResourceTypeStack)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.next.DeleteStack(ctx, identifiers)
|
||||
}
|
||||
|
||||
func (s *authMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
|
||||
err := s.authAgent.OrgPermissions(ctx, orgID, influxdb.ReadAction)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -43,6 +43,24 @@ func (s *loggingMW) InitStack(ctx context.Context, userID influxdb.ID, newStack
|
|||
return s.next.InitStack(ctx, userID, newStack)
|
||||
}
|
||||
|
||||
func (s *loggingMW) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) (err error) {
|
||||
defer func(start time.Time) {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
s.logger.Error(
|
||||
"failed to delete stack",
|
||||
zap.Error(err),
|
||||
zap.Stringer("orgID", identifiers.OrgID),
|
||||
zap.Stringer("userID", identifiers.OrgID),
|
||||
zap.Stringer("stackID", identifiers.StackID),
|
||||
zap.Duration("took", time.Since(start)),
|
||||
)
|
||||
}(time.Now())
|
||||
return s.next.DeleteStack(ctx, identifiers)
|
||||
}
|
||||
|
||||
func (s *loggingMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) (stacks []Stack, err error) {
|
||||
defer func(start time.Time) {
|
||||
if err == nil {
|
||||
|
|
|
|||
|
|
@ -33,6 +33,11 @@ func (s *mwMetrics) InitStack(ctx context.Context, userID influxdb.ID, newStack
|
|||
return stack, rec(err)
|
||||
}
|
||||
|
||||
func (s *mwMetrics) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error {
|
||||
rec := s.rec.Record("delete_stack")
|
||||
return rec(s.next.DeleteStack(ctx, identifiers))
|
||||
}
|
||||
|
||||
func (s *mwMetrics) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
|
||||
rec := s.rec.Record("list_stacks")
|
||||
stacks, err := s.next.ListStacks(ctx, orgID, f)
|
||||
|
|
|
|||
|
|
@ -27,6 +27,12 @@ func (s *traceMW) InitStack(ctx context.Context, userID influxdb.ID, newStack St
|
|||
return s.next.InitStack(ctx, userID, newStack)
|
||||
}
|
||||
|
||||
func (s *traceMW) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error {
|
||||
span, ctx := tracing.StartSpanFromContextWithOperationName(ctx, "DeleteStack")
|
||||
defer span.Finish()
|
||||
return s.next.DeleteStack(ctx, identifiers)
|
||||
}
|
||||
|
||||
func (s *traceMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
|
||||
span, ctx := tracing.StartSpanFromContextWithOperationName(ctx, "ListStacks")
|
||||
defer span.Finish()
|
||||
|
|
|
|||
Loading…
Reference in New Issue