diff --git a/cmd/influx/pkg.go b/cmd/influx/pkg.go index 9bf8656dc6..62d817917a 100644 --- a/cmd/influx/pkg.go +++ b/cmd/influx/pkg.go @@ -1106,10 +1106,11 @@ func (b *cmdPkgBuilder) printPkgSummary(sum pkger.Summary) error { } if teles := sum.TelegrafConfigs; len(teles) > 0 { - headers := []string{"ID", "Name", "Description"} + headers := []string{"Package Name", "ID", "Resource Name", "Description"} tablePrintFn("TELEGRAF CONFIGS", headers, len(teles), func(i int) []string { t := teles[i] return []string{ + t.PkgName, t.TelegrafConfig.ID.String(), t.TelegrafConfig.Name, t.TelegrafConfig.Description, diff --git a/cmd/influxd/launcher/pkger_test.go b/cmd/influxd/launcher/pkger_test.go index 73579cf5e1..311cc456da 100644 --- a/cmd/influxd/launcher/pkger_test.go +++ b/cmd/influxd/launcher/pkger_test.go @@ -114,6 +114,16 @@ func TestLauncher_Pkger(t *testing.T) { return obj } + newTelegrafObject := func(pkgName, name, description string) pkger.Object { + obj := pkger.TelegrafToObject("", influxdb.TelegrafConfig{ + Name: name, + Description: description, + Config: telegrafCfg, + }) + obj.SetMetadataName(pkgName) + return obj + } + newVariableObject := func(pkgName, name, description string) pkger.Object { obj := pkger.VariableToObject("", influxdb.Variable{ Name: name, @@ -165,6 +175,7 @@ func TestLauncher_Pkger(t *testing.T) { initialEndpointPkgName = "endzo" initialLabelPkgName = "labelino" initialTaskPkgName = "tap" + initialTelegrafPkgName = "teletype" initialVariablePkgName = "laces out dan" ) initialPkg := newPkg( @@ -174,6 +185,7 @@ func TestLauncher_Pkger(t *testing.T) { newEndpointHTTP(initialEndpointPkgName, "endpoint_0", "init desc"), newLabelObject(initialLabelPkgName, "label 1", "init desc", "#222eee"), newTaskObject(initialTaskPkgName, "task_0", "init desc"), + newTelegrafObject(initialTelegrafPkgName, "tele_0", "init desc"), newVariableObject(initialVariablePkgName, "var char", "init desc"), ) @@ -211,6 +223,11 @@ func TestLauncher_Pkger(t *testing.T) { assert.Equal(t, "task_0", sum.Tasks[0].Name) assert.Equal(t, "init desc", sum.Tasks[0].Description) + require.Len(t, sum.TelegrafConfigs, 1) + assert.NotZero(t, sum.TelegrafConfigs[0].TelegrafConfig.ID) + assert.Equal(t, "tele_0", sum.TelegrafConfigs[0].TelegrafConfig.Name) + assert.Equal(t, "init desc", sum.TelegrafConfigs[0].TelegrafConfig.Description) + require.Len(t, sum.Variables, 1) assert.NotZero(t, sum.Variables[0].ID) assert.Equal(t, "var char", sum.Variables[0].Name) @@ -233,9 +250,12 @@ func TestLauncher_Pkger(t *testing.T) { actualLabel := resourceCheck.mustGetLabel(t, byName("label 1")) assert.Equal(t, sum.Labels[0].ID, pkger.SafeID(actualLabel.ID)) - actualTask := resourceCheck.mustGetTask(t, byName("task_0")) + actualTask := resourceCheck.mustGetTask(t, byNameAndOrg(l.Org.ID, "task_0")) assert.Equal(t, sum.Tasks[0].ID, pkger.SafeID(actualTask.ID)) + actualTele := resourceCheck.mustGetTelegrafConfig(t, byNameAndOrg(l.Org.ID, "tele_0")) + assert.Equal(t, sum.TelegrafConfigs[0].TelegrafConfig.ID, actualTele.ID) + actualVar := resourceCheck.mustGetVariable(t, byName("var char")) assert.Equal(t, sum.Variables[0].ID, pkger.SafeID(actualVar.ID)) } @@ -248,6 +268,7 @@ func TestLauncher_Pkger(t *testing.T) { updateEndpointName = "new endpoint" updateLabelName = "new label" updateTaskName = "new task" + updateTelegrafName = "new telegraf" updateVariableName = "new variable" ) t.Run("apply pkg with stack id where resources change", func(t *testing.T) { @@ -258,6 +279,7 @@ func TestLauncher_Pkger(t *testing.T) { newEndpointHTTP(initialEndpointPkgName, updateEndpointName, ""), newLabelObject(initialLabelPkgName, updateLabelName, "", ""), newTaskObject(initialTaskPkgName, updateTaskName, ""), + newTelegrafObject(initialTelegrafPkgName, updateTelegrafName, ""), newVariableObject(initialVariablePkgName, updateVariableName, ""), ) sum, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, updatedPkg, applyOpt) @@ -288,6 +310,11 @@ func TestLauncher_Pkger(t *testing.T) { assert.Equal(t, initialSum.Tasks[0].ID, sum.Tasks[0].ID) assert.Equal(t, updateTaskName, sum.Tasks[0].Name) + require.Len(t, sum.TelegrafConfigs, 1) + updatedTele := sum.TelegrafConfigs[0].TelegrafConfig + assert.Equal(t, initialSum.TelegrafConfigs[0].TelegrafConfig.ID, updatedTele.ID) + assert.Equal(t, updateTelegrafName, updatedTele.Name) + require.Len(t, sum.Variables, 1) assert.Equal(t, initialSum.Variables[0].ID, sum.Variables[0].ID) assert.Equal(t, updateVariableName, sum.Variables[0].Name) @@ -309,9 +336,12 @@ func TestLauncher_Pkger(t *testing.T) { actualLabel := resourceCheck.mustGetLabel(t, byName(updateLabelName)) require.Equal(t, initialSum.Labels[0].ID, pkger.SafeID(actualLabel.ID)) - actualTask := resourceCheck.mustGetTask(t, byName(updateTaskName)) + actualTask := resourceCheck.mustGetTask(t, byNameAndOrg(l.Org.ID, updateTaskName)) require.Equal(t, initialSum.Tasks[0].ID, pkger.SafeID(actualTask.ID)) + actualTelegraf := resourceCheck.mustGetTelegrafConfig(t, byNameAndOrg(l.Org.ID, updateTelegrafName)) + require.Equal(t, initialSum.TelegrafConfigs[0].TelegrafConfig.ID, actualTelegraf.ID) + actualVar := resourceCheck.mustGetVariable(t, byName(updateVariableName)) assert.Equal(t, sum.Variables[0].ID, pkger.SafeID(actualVar.ID)) } @@ -349,6 +379,7 @@ func TestLauncher_Pkger(t *testing.T) { newCheckDeadmanObject(t, "z_check", "", time.Hour), newEndpointHTTP("z_endpoint_rolls_back", "", ""), newTaskObject("z_task_rolls_back", "", ""), + newTelegrafObject("z_telegraf_rolls_back", "", ""), newVariableObject("z_var_rolls_back", "", ""), ) _, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, pkgWithDelete, applyOpt) @@ -374,6 +405,9 @@ func TestLauncher_Pkger(t *testing.T) { actualTask := resourceCheck.mustGetTask(t, byNameAndOrg(l.Org.ID, updateTaskName)) assert.NotEqual(t, initialSum.Tasks[0].ID, pkger.SafeID(actualTask.ID)) + actualTelegraf := resourceCheck.mustGetTelegrafConfig(t, byNameAndOrg(l.Org.ID, updateTelegrafName)) + require.NotEqual(t, initialSum.TelegrafConfigs[0].TelegrafConfig.ID, actualTelegraf.ID) + actualVariable := resourceCheck.mustGetVariable(t, byName(updateVariableName)) assert.NotEqual(t, initialSum.Variables[0].ID, pkger.SafeID(actualVariable.ID)) } @@ -397,6 +431,9 @@ func TestLauncher_Pkger(t *testing.T) { _, err = resourceCheck.getLabel(t, byName("z_label_roller")) assert.Error(t, err) + _, err = resourceCheck.getTelegrafConfig(t, byNameAndOrg(l.Org.ID, "z_telegraf_rolls_back")) + assert.Error(t, err) + _, err = resourceCheck.getVariable(t, byName("z_var_rolls_back")) assert.Error(t, err) } @@ -410,6 +447,7 @@ func TestLauncher_Pkger(t *testing.T) { newEndpointHTTP("non_existent_endpoint", "", ""), newLabelObject("non_existent_label", "", "", ""), newTaskObject("non_existent_task", "", ""), + newTelegrafObject("non_existent_tele", "", ""), newVariableObject("non_existent_var", "", ""), ) sum, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, allNewResourcesPkg, applyOpt) @@ -452,6 +490,13 @@ func TestLauncher_Pkger(t *testing.T) { defer resourceCheck.mustDeleteTask(t, influxdb.ID(sum.Tasks[0].ID)) assert.Equal(t, "non_existent_task", sum.Tasks[0].Name) + require.Len(t, sum.TelegrafConfigs, 1) + newTele := sum.TelegrafConfigs[0].TelegrafConfig + assert.NotEqual(t, initialSum.TelegrafConfigs[0].TelegrafConfig.ID, newTele.ID) + assert.NotZero(t, newTele.ID) + defer resourceCheck.mustDeleteTelegrafConfig(t, newTele.ID) + assert.Equal(t, "non_existent_tele", newTele.Name) + require.Len(t, sum.Variables, 1) assert.NotEqual(t, initialSum.Variables[0].ID, sum.Variables[0].ID) assert.NotZero(t, sum.Variables[0].ID) @@ -461,23 +506,24 @@ func TestLauncher_Pkger(t *testing.T) { t.Log("\tvalidate all resources are created") { bkt := resourceCheck.mustGetBucket(t, byName("non_existent_bucket")) - assert.NotEqual(t, initialSum.Buckets[0].ID, sum.Buckets[0].ID) assert.Equal(t, pkger.SafeID(bkt.ID), sum.Buckets[0].ID) chk := resourceCheck.mustGetCheck(t, byName("non_existent_check")) - assert.NotEqual(t, initialSum.Checks[0].Check.GetID(), sum.Checks[0].Check.GetID()) assert.Equal(t, chk.GetID(), sum.Checks[0].Check.GetID()) endpoint := resourceCheck.mustGetEndpoint(t, byName("non_existent_endpoint")) - assert.NotEqual(t, initialSum.NotificationEndpoints[0].NotificationEndpoint.GetID(), endpoint.GetID()) assert.Equal(t, endpoint.GetID(), sum.NotificationEndpoints[0].NotificationEndpoint.GetID()) label := resourceCheck.mustGetLabel(t, byName("non_existent_label")) - assert.NotEqual(t, initialSum.Labels[0].ID, sum.Labels[0].ID) assert.Equal(t, pkger.SafeID(label.ID), sum.Labels[0].ID) + task := resourceCheck.mustGetTask(t, byNameAndOrg(l.Org.ID, "non_existent_task")) + assert.Equal(t, pkger.SafeID(task.ID), sum.Tasks[0].ID) + + tele := resourceCheck.mustGetTelegrafConfig(t, byNameAndOrg(l.Org.ID, "non_existent_tele")) + assert.Equal(t, tele.ID, sum.TelegrafConfigs[0].TelegrafConfig.ID) + variable := resourceCheck.mustGetVariable(t, byName("non_existent_var")) - assert.NotEqual(t, initialSum.Variables[0].ID, sum.Variables[0].ID) assert.Equal(t, pkger.SafeID(variable.ID), sum.Variables[0].ID) } @@ -495,6 +541,12 @@ func TestLauncher_Pkger(t *testing.T) { _, err = resourceCheck.getLabel(t, byName(updateLabelName)) require.Error(t, err) + _, err = resourceCheck.getTask(t, byNameAndOrg(l.Org.ID, updateTaskName)) + require.Error(t, err) + + _, err = resourceCheck.getTelegrafConfig(t, byNameAndOrg(l.Org.ID, updateTelegrafName)) + require.Error(t, err) + _, err = resourceCheck.getVariable(t, byName(updateVariableName)) require.Error(t, err) } @@ -777,7 +829,7 @@ spec: assert.Equal(t, l.Org.ID, teles[0].TelegrafConfig.OrgID) assert.Equal(t, "first tele config", teles[0].TelegrafConfig.Name) assert.Equal(t, "desc", teles[0].TelegrafConfig.Description) - assert.Equal(t, telConf, teles[0].TelegrafConfig.Config) + assert.Equal(t, telegrafCfg, teles[0].TelegrafConfig.Config) vars := sum1.Variables require.Len(t, vars, 1) @@ -895,7 +947,7 @@ spec: require.Len(t, teles, 1) assert.Equal(t, "first tele config", teles[0].TelegrafConfig.Name) assert.Equal(t, "desc", teles[0].TelegrafConfig.Description) - assert.Equal(t, telConf, teles[0].TelegrafConfig.Config) + assert.Equal(t, telegrafCfg, teles[0].TelegrafConfig.Config) vars := sum.Variables require.Len(t, vars, 1) @@ -1508,7 +1560,7 @@ func newPkg(t *testing.T) *pkger.Pkg { return pkg } -const telConf = `[agent] +const telegrafCfg = `[agent] interval = "10s" metric_batch_size = 1000 metric_buffer_limit = 10000 @@ -1719,7 +1771,7 @@ spec: associations: - kind: Label name: label_1 -`, pkger.APIVersion, telConf) +`, pkger.APIVersion, telegrafCfg) var updatePkgYMLStr = fmt.Sprintf(` apiVersion: %[1]s @@ -2084,7 +2136,7 @@ func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (http.Ta err error ) switch opt := getOpt(); { - case opt.name != "" || opt.orgID != 0: + case opt.name != "" && opt.orgID != 0: var filter influxdb.TaskFilter if opt.name != "" { filter.Name = &opt.name @@ -2097,7 +2149,7 @@ func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (http.Ta return http.Task{}, err } for _, tt := range tasks { - if opt.name != "" && tt.Name == opt.name { + if tt.Name == opt.name { task = &tasks[0] break } @@ -2105,7 +2157,7 @@ func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (http.Ta case opt.id != 0: task, err = taskSVC.FindTaskByID(timedCtx(time.Second), opt.id) default: - require.Fail(t, "did not provide any get option") + require.Fail(t, "did not provide a valid get option") } if task == nil { return http.Task{}, errors.New("did not find expected task by name") @@ -2127,6 +2179,51 @@ func (r resourceChecker) mustDeleteTask(t *testing.T, id influxdb.ID) { require.NoError(t, r.tl.TaskService(t).DeleteTask(ctx, id)) } +func (r resourceChecker) getTelegrafConfig(t *testing.T, getOpt getResourceOptFn) (influxdb.TelegrafConfig, error) { + t.Helper() + + teleSVC := r.tl.TelegrafService(t) + + var ( + config *influxdb.TelegrafConfig + err error + ) + switch opt := getOpt(); { + case opt.name != "" && opt.orgID != 0: + teles, _, _ := teleSVC.FindTelegrafConfigs(timedCtx(time.Second), influxdb.TelegrafConfigFilter{ + OrgID: &opt.orgID, + }) + for _, tt := range teles { + if opt.name != "" && tt.Name == opt.name { + config = teles[0] + break + } + } + case opt.id != 0: + config, err = teleSVC.FindTelegrafConfigByID(timedCtx(time.Second), opt.id) + default: + require.Fail(t, "did not provide a valid get option") + } + if config == nil { + return influxdb.TelegrafConfig{}, errors.New("did not find expected telegraf by name") + } + + return *config, err +} + +func (r resourceChecker) mustGetTelegrafConfig(t *testing.T, getOpt getResourceOptFn) influxdb.TelegrafConfig { + t.Helper() + + tele, err := r.getTelegrafConfig(t, getOpt) + require.NoError(t, err) + return tele +} + +func (r resourceChecker) mustDeleteTelegrafConfig(t *testing.T, id influxdb.ID) { + t.Helper() + require.NoError(t, r.tl.TelegrafService(t).DeleteTelegrafConfig(ctx, id)) +} + func (r resourceChecker) getVariable(t *testing.T, getOpt getResourceOptFn) (influxdb.Variable, error) { t.Helper() diff --git a/pkger/clone_resource.go b/pkger/clone_resource.go index 8c4abef1f8..cb71dcc701 100644 --- a/pkger/clone_resource.go +++ b/pkger/clone_resource.go @@ -259,7 +259,7 @@ func (ex *resourceExporter) resourceCloneToKind(ctx context.Context, r ResourceT if err != nil { return err } - mapResource(t.OrgID, t.ID, KindTelegraf, telegrafToObject(*t, r.Name)) + mapResource(t.OrgID, t.ID, KindTelegraf, TelegrafToObject(r.Name, *t)) case r.Kind.is(KindVariable): v, err := ex.varSVC.FindVariableByID(ctx, r.ID) if err != nil { @@ -913,7 +913,7 @@ func NotificationRuleToObject(name, endpointPkgName string, iRule influxdb.Notif // regex used to rip out the hard coded task option stuffs var taskFluxRegex = regexp.MustCompile(`option task = {(.|\n)*?}`) -// TaskToObject coverts an influxdb.Task into a pkger.Object. +// TaskToObject converts an influxdb.Task into a pkger.Object. func TaskToObject(name string, t influxdb.Task) Object { if name == "" { name = t.Name @@ -932,7 +932,8 @@ func TaskToObject(name string, t influxdb.Task) Object { return o } -func telegrafToObject(t influxdb.TelegrafConfig, name string) Object { +// TelegrafToObject converts an influxdb.TelegrafConfig into a pkger.Object. +func TelegrafToObject(name string, t influxdb.TelegrafConfig) Object { if name == "" { name = t.Name } diff --git a/pkger/service.go b/pkger/service.go index 325747680d..c912b25eda 100644 --- a/pkger/service.go +++ b/pkger/service.go @@ -684,6 +684,7 @@ func (s *Service) dryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg, opts s.dryRunDashboards(ctx, orgID, state.mDashboards) s.dryRunLabels(ctx, orgID, state.mLabels) s.dryRunTasks(ctx, orgID, state.mTasks) + s.dryRunTelegrafConfigs(ctx, orgID, state.mTelegrafs) s.dryRunVariables(ctx, orgID, state.mVariables) err := s.dryRunNotificationEndpoints(ctx, orgID, state.mEndpoints) if err != nil { @@ -864,6 +865,20 @@ func (s *Service) dryRunTasks(ctx context.Context, orgID influxdb.ID, tasks map[ } } +func (s *Service) dryRunTelegrafConfigs(ctx context.Context, orgID influxdb.ID, teleConfigs map[string]*stateTelegraf) { + for _, stateTele := range teleConfigs { + stateTele.orgID = orgID + var existing *influxdb.TelegrafConfig + if stateTele.ID() != 0 { + existing, _ = s.teleSVC.FindTelegrafConfigByID(ctx, stateTele.ID()) + } + if IsNew(stateTele.stateStatus) && existing != nil { + stateTele.stateStatus = StateStatusExists + } + stateTele.existing = existing + } +} + func (s *Service) dryRunVariables(ctx context.Context, orgID influxdb.ID, vars map[string]*stateVariable) { existingVars, _ := s.getAllPlatformVariables(ctx, orgID) @@ -1175,7 +1190,7 @@ func (s *Service) applyState(ctx context.Context, coordinator *rollbackCoordinat s.applyDashboards(ctx, state.dashboards()), s.applyNotificationEndpoints(ctx, userID, state.endpoints()), s.applyTasks(ctx, state.tasks()), - s.applyTelegrafs(state.telegrafConfigs()), + s.applyTelegrafs(ctx, userID, state.telegrafConfigs()), }, } @@ -2070,29 +2085,29 @@ func (s *Service) rollbackTasks(ctx context.Context, tasks []*stateTask) error { return nil } -func (s *Service) applyTelegrafs(teles []*stateTelegraf) applier { +func (s *Service) applyTelegrafs(ctx context.Context, userID influxdb.ID, teles []*stateTelegraf) applier { const resource = "telegrafs" mutex := new(doMutex) rollbackTelegrafs := make([]*stateTelegraf, 0, len(teles)) createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody { - var cfg influxdb.TelegrafConfig + var t *stateTelegraf mutex.Do(func() { teles[i].orgID = orgID - cfg = teles[i].summarize().TelegrafConfig + t = teles[i] }) - err := s.teleSVC.CreateTelegrafConfig(ctx, &cfg, userID) + existing, err := s.applyTelegrafConfig(ctx, userID, t) if err != nil { return &applyErrBody{ - name: cfg.Name, + name: t.parserTelegraf.Name(), msg: err.Error(), } } mutex.Do(func() { - teles[i].id = cfg.ID + teles[i].id = existing.ID rollbackTelegrafs = append(rollbackTelegrafs, teles[i]) }) @@ -2107,14 +2122,65 @@ func (s *Service) applyTelegrafs(teles []*stateTelegraf) applier { rollbacker: rollbacker{ resource: resource, fn: func(_ influxdb.ID) error { - return s.deleteByIDs("telegraf", len(rollbackTelegrafs), s.teleSVC.DeleteTelegrafConfig, func(i int) influxdb.ID { - return rollbackTelegrafs[i].ID() - }) + return s.rollbackTelegrafConfigs(ctx, userID, rollbackTelegrafs) }, }, } } +func (s *Service) applyTelegrafConfig(ctx context.Context, userID influxdb.ID, t *stateTelegraf) (influxdb.TelegrafConfig, error) { + switch t.stateStatus { + case StateStatusRemove: + if err := s.teleSVC.DeleteTelegrafConfig(ctx, t.ID()); err != nil { + return influxdb.TelegrafConfig{}, ierrors.Wrap(err, "failed to delete config") + } + return *t.existing, nil + case StateStatusExists: + cfg := t.summarize().TelegrafConfig + updatedConfig, err := s.teleSVC.UpdateTelegrafConfig(ctx, t.ID(), &cfg, userID) + if err != nil { + return influxdb.TelegrafConfig{}, ierrors.Wrap(err, "failed to update config") + } + return *updatedConfig, nil + default: + cfg := t.summarize().TelegrafConfig + err := s.teleSVC.CreateTelegrafConfig(ctx, &cfg, userID) + if err != nil { + return influxdb.TelegrafConfig{}, ierrors.Wrap(err, "failed to create telegraf config") + } + return cfg, nil + } +} + +func (s *Service) rollbackTelegrafConfigs(ctx context.Context, userID influxdb.ID, cfgs []*stateTelegraf) error { + rollbackFn := func(t *stateTelegraf) error { + var err error + switch t.stateStatus { + case StateStatusRemove: + err = ierrors.Wrap(s.teleSVC.CreateTelegrafConfig(ctx, t.existing, userID), "rolling back removed telegraf config") + case StateStatusExists: + _, err = s.teleSVC.UpdateTelegrafConfig(ctx, t.ID(), t.existing, userID) + err = ierrors.Wrap(err, "rolling back updated telegraf config") + default: + err = ierrors.Wrap(s.teleSVC.DeleteTelegrafConfig(ctx, t.ID()), "rolling back created telegraf config") + } + return err + } + + var errs []string + for _, v := range cfgs { + if err := rollbackFn(v); err != nil { + errs = append(errs, fmt.Sprintf("error for variable[%q]: %s", v.ID(), err)) + } + } + + if len(errs) > 0 { + return errors.New(strings.Join(errs, "; ")) + } + + return nil +} + func (s *Service) applyVariables(ctx context.Context, vars []*stateVariable) applier { const resource = "variable" @@ -2295,23 +2361,6 @@ func (s *Service) rollbackLabelMappings(mappings []stateLabelMapping) error { return nil } -func (s *Service) deleteByIDs(resource string, numIDs int, deleteFn func(context.Context, influxdb.ID) error, iterFn func(int) influxdb.ID) error { - var errs []string - for i := range make([]struct{}, numIDs) { - id := iterFn(i) - err := deleteFn(context.Background(), id) - if err != nil { - errs = append(errs, id.String()) - } - } - - if len(errs) > 0 { - return fmt.Errorf(`%s_ids=[%s] err="unable to delete"`, resource, strings.Join(errs, ", ")) - } - - return nil -} - func (s *Service) updateStackAfterSuccess(ctx context.Context, stackID influxdb.ID, state *stateCoordinator) error { stack, err := s.store.ReadStackByID(ctx, stackID) if err != nil { @@ -2385,6 +2434,17 @@ func (s *Service) updateStackAfterSuccess(ctx context.Context, stackID influxdb. Name: t.parserTask.PkgName(), }) } + for _, t := range state.mTelegrafs { + if IsRemoval(t.stateStatus) { + continue + } + stackResources = append(stackResources, StackResource{ + APIVersion: APIVersion, + ID: t.ID(), + Kind: KindTelegraf, + Name: t.parserTelegraf.PkgName(), + }) + } for _, v := range state.mVariables { if IsRemoval(v.stateStatus) { continue @@ -2469,6 +2529,13 @@ func (s *Service) updateStackAfterRollback(ctx context.Context, stackID influxdb res.ID = t.existing.ID } } + for _, t := range state.mTelegrafs { + res, ok := existingResources[newKey(KindTelegraf, t.parserTelegraf.PkgName())] + if ok && res.ID != t.ID() { + hasChanges = true + res.ID = t.existing.ID + } + } for _, v := range state.mVariables { res, ok := existingResources[newKey(KindVariable, v.parserVar.PkgName())] if ok && res.ID != v.ID() {