feat(pkger): add stateful management for telegraf configs
parent
f518b2d075
commit
47e5facc81
|
|
@ -1106,10 +1106,11 @@ func (b *cmdPkgBuilder) printPkgSummary(sum pkger.Summary) error {
|
|||
}
|
||||
|
||||
if teles := sum.TelegrafConfigs; len(teles) > 0 {
|
||||
headers := []string{"ID", "Name", "Description"}
|
||||
headers := []string{"Package Name", "ID", "Resource Name", "Description"}
|
||||
tablePrintFn("TELEGRAF CONFIGS", headers, len(teles), func(i int) []string {
|
||||
t := teles[i]
|
||||
return []string{
|
||||
t.PkgName,
|
||||
t.TelegrafConfig.ID.String(),
|
||||
t.TelegrafConfig.Name,
|
||||
t.TelegrafConfig.Description,
|
||||
|
|
|
|||
|
|
@ -114,6 +114,16 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
return obj
|
||||
}
|
||||
|
||||
newTelegrafObject := func(pkgName, name, description string) pkger.Object {
|
||||
obj := pkger.TelegrafToObject("", influxdb.TelegrafConfig{
|
||||
Name: name,
|
||||
Description: description,
|
||||
Config: telegrafCfg,
|
||||
})
|
||||
obj.SetMetadataName(pkgName)
|
||||
return obj
|
||||
}
|
||||
|
||||
newVariableObject := func(pkgName, name, description string) pkger.Object {
|
||||
obj := pkger.VariableToObject("", influxdb.Variable{
|
||||
Name: name,
|
||||
|
|
@ -165,6 +175,7 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
initialEndpointPkgName = "endzo"
|
||||
initialLabelPkgName = "labelino"
|
||||
initialTaskPkgName = "tap"
|
||||
initialTelegrafPkgName = "teletype"
|
||||
initialVariablePkgName = "laces out dan"
|
||||
)
|
||||
initialPkg := newPkg(
|
||||
|
|
@ -174,6 +185,7 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
newEndpointHTTP(initialEndpointPkgName, "endpoint_0", "init desc"),
|
||||
newLabelObject(initialLabelPkgName, "label 1", "init desc", "#222eee"),
|
||||
newTaskObject(initialTaskPkgName, "task_0", "init desc"),
|
||||
newTelegrafObject(initialTelegrafPkgName, "tele_0", "init desc"),
|
||||
newVariableObject(initialVariablePkgName, "var char", "init desc"),
|
||||
)
|
||||
|
||||
|
|
@ -211,6 +223,11 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
assert.Equal(t, "task_0", sum.Tasks[0].Name)
|
||||
assert.Equal(t, "init desc", sum.Tasks[0].Description)
|
||||
|
||||
require.Len(t, sum.TelegrafConfigs, 1)
|
||||
assert.NotZero(t, sum.TelegrafConfigs[0].TelegrafConfig.ID)
|
||||
assert.Equal(t, "tele_0", sum.TelegrafConfigs[0].TelegrafConfig.Name)
|
||||
assert.Equal(t, "init desc", sum.TelegrafConfigs[0].TelegrafConfig.Description)
|
||||
|
||||
require.Len(t, sum.Variables, 1)
|
||||
assert.NotZero(t, sum.Variables[0].ID)
|
||||
assert.Equal(t, "var char", sum.Variables[0].Name)
|
||||
|
|
@ -233,9 +250,12 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
actualLabel := resourceCheck.mustGetLabel(t, byName("label 1"))
|
||||
assert.Equal(t, sum.Labels[0].ID, pkger.SafeID(actualLabel.ID))
|
||||
|
||||
actualTask := resourceCheck.mustGetTask(t, byName("task_0"))
|
||||
actualTask := resourceCheck.mustGetTask(t, byNameAndOrg(l.Org.ID, "task_0"))
|
||||
assert.Equal(t, sum.Tasks[0].ID, pkger.SafeID(actualTask.ID))
|
||||
|
||||
actualTele := resourceCheck.mustGetTelegrafConfig(t, byNameAndOrg(l.Org.ID, "tele_0"))
|
||||
assert.Equal(t, sum.TelegrafConfigs[0].TelegrafConfig.ID, actualTele.ID)
|
||||
|
||||
actualVar := resourceCheck.mustGetVariable(t, byName("var char"))
|
||||
assert.Equal(t, sum.Variables[0].ID, pkger.SafeID(actualVar.ID))
|
||||
}
|
||||
|
|
@ -248,6 +268,7 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
updateEndpointName = "new endpoint"
|
||||
updateLabelName = "new label"
|
||||
updateTaskName = "new task"
|
||||
updateTelegrafName = "new telegraf"
|
||||
updateVariableName = "new variable"
|
||||
)
|
||||
t.Run("apply pkg with stack id where resources change", func(t *testing.T) {
|
||||
|
|
@ -258,6 +279,7 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
newEndpointHTTP(initialEndpointPkgName, updateEndpointName, ""),
|
||||
newLabelObject(initialLabelPkgName, updateLabelName, "", ""),
|
||||
newTaskObject(initialTaskPkgName, updateTaskName, ""),
|
||||
newTelegrafObject(initialTelegrafPkgName, updateTelegrafName, ""),
|
||||
newVariableObject(initialVariablePkgName, updateVariableName, ""),
|
||||
)
|
||||
sum, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, updatedPkg, applyOpt)
|
||||
|
|
@ -288,6 +310,11 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
assert.Equal(t, initialSum.Tasks[0].ID, sum.Tasks[0].ID)
|
||||
assert.Equal(t, updateTaskName, sum.Tasks[0].Name)
|
||||
|
||||
require.Len(t, sum.TelegrafConfigs, 1)
|
||||
updatedTele := sum.TelegrafConfigs[0].TelegrafConfig
|
||||
assert.Equal(t, initialSum.TelegrafConfigs[0].TelegrafConfig.ID, updatedTele.ID)
|
||||
assert.Equal(t, updateTelegrafName, updatedTele.Name)
|
||||
|
||||
require.Len(t, sum.Variables, 1)
|
||||
assert.Equal(t, initialSum.Variables[0].ID, sum.Variables[0].ID)
|
||||
assert.Equal(t, updateVariableName, sum.Variables[0].Name)
|
||||
|
|
@ -309,9 +336,12 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
actualLabel := resourceCheck.mustGetLabel(t, byName(updateLabelName))
|
||||
require.Equal(t, initialSum.Labels[0].ID, pkger.SafeID(actualLabel.ID))
|
||||
|
||||
actualTask := resourceCheck.mustGetTask(t, byName(updateTaskName))
|
||||
actualTask := resourceCheck.mustGetTask(t, byNameAndOrg(l.Org.ID, updateTaskName))
|
||||
require.Equal(t, initialSum.Tasks[0].ID, pkger.SafeID(actualTask.ID))
|
||||
|
||||
actualTelegraf := resourceCheck.mustGetTelegrafConfig(t, byNameAndOrg(l.Org.ID, updateTelegrafName))
|
||||
require.Equal(t, initialSum.TelegrafConfigs[0].TelegrafConfig.ID, actualTelegraf.ID)
|
||||
|
||||
actualVar := resourceCheck.mustGetVariable(t, byName(updateVariableName))
|
||||
assert.Equal(t, sum.Variables[0].ID, pkger.SafeID(actualVar.ID))
|
||||
}
|
||||
|
|
@ -349,6 +379,7 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
newCheckDeadmanObject(t, "z_check", "", time.Hour),
|
||||
newEndpointHTTP("z_endpoint_rolls_back", "", ""),
|
||||
newTaskObject("z_task_rolls_back", "", ""),
|
||||
newTelegrafObject("z_telegraf_rolls_back", "", ""),
|
||||
newVariableObject("z_var_rolls_back", "", ""),
|
||||
)
|
||||
_, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, pkgWithDelete, applyOpt)
|
||||
|
|
@ -374,6 +405,9 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
actualTask := resourceCheck.mustGetTask(t, byNameAndOrg(l.Org.ID, updateTaskName))
|
||||
assert.NotEqual(t, initialSum.Tasks[0].ID, pkger.SafeID(actualTask.ID))
|
||||
|
||||
actualTelegraf := resourceCheck.mustGetTelegrafConfig(t, byNameAndOrg(l.Org.ID, updateTelegrafName))
|
||||
require.NotEqual(t, initialSum.TelegrafConfigs[0].TelegrafConfig.ID, actualTelegraf.ID)
|
||||
|
||||
actualVariable := resourceCheck.mustGetVariable(t, byName(updateVariableName))
|
||||
assert.NotEqual(t, initialSum.Variables[0].ID, pkger.SafeID(actualVariable.ID))
|
||||
}
|
||||
|
|
@ -397,6 +431,9 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
_, err = resourceCheck.getLabel(t, byName("z_label_roller"))
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = resourceCheck.getTelegrafConfig(t, byNameAndOrg(l.Org.ID, "z_telegraf_rolls_back"))
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = resourceCheck.getVariable(t, byName("z_var_rolls_back"))
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
|
@ -410,6 +447,7 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
newEndpointHTTP("non_existent_endpoint", "", ""),
|
||||
newLabelObject("non_existent_label", "", "", ""),
|
||||
newTaskObject("non_existent_task", "", ""),
|
||||
newTelegrafObject("non_existent_tele", "", ""),
|
||||
newVariableObject("non_existent_var", "", ""),
|
||||
)
|
||||
sum, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, allNewResourcesPkg, applyOpt)
|
||||
|
|
@ -452,6 +490,13 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
defer resourceCheck.mustDeleteTask(t, influxdb.ID(sum.Tasks[0].ID))
|
||||
assert.Equal(t, "non_existent_task", sum.Tasks[0].Name)
|
||||
|
||||
require.Len(t, sum.TelegrafConfigs, 1)
|
||||
newTele := sum.TelegrafConfigs[0].TelegrafConfig
|
||||
assert.NotEqual(t, initialSum.TelegrafConfigs[0].TelegrafConfig.ID, newTele.ID)
|
||||
assert.NotZero(t, newTele.ID)
|
||||
defer resourceCheck.mustDeleteTelegrafConfig(t, newTele.ID)
|
||||
assert.Equal(t, "non_existent_tele", newTele.Name)
|
||||
|
||||
require.Len(t, sum.Variables, 1)
|
||||
assert.NotEqual(t, initialSum.Variables[0].ID, sum.Variables[0].ID)
|
||||
assert.NotZero(t, sum.Variables[0].ID)
|
||||
|
|
@ -461,23 +506,24 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
t.Log("\tvalidate all resources are created")
|
||||
{
|
||||
bkt := resourceCheck.mustGetBucket(t, byName("non_existent_bucket"))
|
||||
assert.NotEqual(t, initialSum.Buckets[0].ID, sum.Buckets[0].ID)
|
||||
assert.Equal(t, pkger.SafeID(bkt.ID), sum.Buckets[0].ID)
|
||||
|
||||
chk := resourceCheck.mustGetCheck(t, byName("non_existent_check"))
|
||||
assert.NotEqual(t, initialSum.Checks[0].Check.GetID(), sum.Checks[0].Check.GetID())
|
||||
assert.Equal(t, chk.GetID(), sum.Checks[0].Check.GetID())
|
||||
|
||||
endpoint := resourceCheck.mustGetEndpoint(t, byName("non_existent_endpoint"))
|
||||
assert.NotEqual(t, initialSum.NotificationEndpoints[0].NotificationEndpoint.GetID(), endpoint.GetID())
|
||||
assert.Equal(t, endpoint.GetID(), sum.NotificationEndpoints[0].NotificationEndpoint.GetID())
|
||||
|
||||
label := resourceCheck.mustGetLabel(t, byName("non_existent_label"))
|
||||
assert.NotEqual(t, initialSum.Labels[0].ID, sum.Labels[0].ID)
|
||||
assert.Equal(t, pkger.SafeID(label.ID), sum.Labels[0].ID)
|
||||
|
||||
task := resourceCheck.mustGetTask(t, byNameAndOrg(l.Org.ID, "non_existent_task"))
|
||||
assert.Equal(t, pkger.SafeID(task.ID), sum.Tasks[0].ID)
|
||||
|
||||
tele := resourceCheck.mustGetTelegrafConfig(t, byNameAndOrg(l.Org.ID, "non_existent_tele"))
|
||||
assert.Equal(t, tele.ID, sum.TelegrafConfigs[0].TelegrafConfig.ID)
|
||||
|
||||
variable := resourceCheck.mustGetVariable(t, byName("non_existent_var"))
|
||||
assert.NotEqual(t, initialSum.Variables[0].ID, sum.Variables[0].ID)
|
||||
assert.Equal(t, pkger.SafeID(variable.ID), sum.Variables[0].ID)
|
||||
}
|
||||
|
||||
|
|
@ -495,6 +541,12 @@ func TestLauncher_Pkger(t *testing.T) {
|
|||
_, err = resourceCheck.getLabel(t, byName(updateLabelName))
|
||||
require.Error(t, err)
|
||||
|
||||
_, err = resourceCheck.getTask(t, byNameAndOrg(l.Org.ID, updateTaskName))
|
||||
require.Error(t, err)
|
||||
|
||||
_, err = resourceCheck.getTelegrafConfig(t, byNameAndOrg(l.Org.ID, updateTelegrafName))
|
||||
require.Error(t, err)
|
||||
|
||||
_, err = resourceCheck.getVariable(t, byName(updateVariableName))
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
|
@ -777,7 +829,7 @@ spec:
|
|||
assert.Equal(t, l.Org.ID, teles[0].TelegrafConfig.OrgID)
|
||||
assert.Equal(t, "first tele config", teles[0].TelegrafConfig.Name)
|
||||
assert.Equal(t, "desc", teles[0].TelegrafConfig.Description)
|
||||
assert.Equal(t, telConf, teles[0].TelegrafConfig.Config)
|
||||
assert.Equal(t, telegrafCfg, teles[0].TelegrafConfig.Config)
|
||||
|
||||
vars := sum1.Variables
|
||||
require.Len(t, vars, 1)
|
||||
|
|
@ -895,7 +947,7 @@ spec:
|
|||
require.Len(t, teles, 1)
|
||||
assert.Equal(t, "first tele config", teles[0].TelegrafConfig.Name)
|
||||
assert.Equal(t, "desc", teles[0].TelegrafConfig.Description)
|
||||
assert.Equal(t, telConf, teles[0].TelegrafConfig.Config)
|
||||
assert.Equal(t, telegrafCfg, teles[0].TelegrafConfig.Config)
|
||||
|
||||
vars := sum.Variables
|
||||
require.Len(t, vars, 1)
|
||||
|
|
@ -1508,7 +1560,7 @@ func newPkg(t *testing.T) *pkger.Pkg {
|
|||
return pkg
|
||||
}
|
||||
|
||||
const telConf = `[agent]
|
||||
const telegrafCfg = `[agent]
|
||||
interval = "10s"
|
||||
metric_batch_size = 1000
|
||||
metric_buffer_limit = 10000
|
||||
|
|
@ -1719,7 +1771,7 @@ spec:
|
|||
associations:
|
||||
- kind: Label
|
||||
name: label_1
|
||||
`, pkger.APIVersion, telConf)
|
||||
`, pkger.APIVersion, telegrafCfg)
|
||||
|
||||
var updatePkgYMLStr = fmt.Sprintf(`
|
||||
apiVersion: %[1]s
|
||||
|
|
@ -2084,7 +2136,7 @@ func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (http.Ta
|
|||
err error
|
||||
)
|
||||
switch opt := getOpt(); {
|
||||
case opt.name != "" || opt.orgID != 0:
|
||||
case opt.name != "" && opt.orgID != 0:
|
||||
var filter influxdb.TaskFilter
|
||||
if opt.name != "" {
|
||||
filter.Name = &opt.name
|
||||
|
|
@ -2097,7 +2149,7 @@ func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (http.Ta
|
|||
return http.Task{}, err
|
||||
}
|
||||
for _, tt := range tasks {
|
||||
if opt.name != "" && tt.Name == opt.name {
|
||||
if tt.Name == opt.name {
|
||||
task = &tasks[0]
|
||||
break
|
||||
}
|
||||
|
|
@ -2105,7 +2157,7 @@ func (r resourceChecker) getTask(t *testing.T, getOpt getResourceOptFn) (http.Ta
|
|||
case opt.id != 0:
|
||||
task, err = taskSVC.FindTaskByID(timedCtx(time.Second), opt.id)
|
||||
default:
|
||||
require.Fail(t, "did not provide any get option")
|
||||
require.Fail(t, "did not provide a valid get option")
|
||||
}
|
||||
if task == nil {
|
||||
return http.Task{}, errors.New("did not find expected task by name")
|
||||
|
|
@ -2127,6 +2179,51 @@ func (r resourceChecker) mustDeleteTask(t *testing.T, id influxdb.ID) {
|
|||
require.NoError(t, r.tl.TaskService(t).DeleteTask(ctx, id))
|
||||
}
|
||||
|
||||
func (r resourceChecker) getTelegrafConfig(t *testing.T, getOpt getResourceOptFn) (influxdb.TelegrafConfig, error) {
|
||||
t.Helper()
|
||||
|
||||
teleSVC := r.tl.TelegrafService(t)
|
||||
|
||||
var (
|
||||
config *influxdb.TelegrafConfig
|
||||
err error
|
||||
)
|
||||
switch opt := getOpt(); {
|
||||
case opt.name != "" && opt.orgID != 0:
|
||||
teles, _, _ := teleSVC.FindTelegrafConfigs(timedCtx(time.Second), influxdb.TelegrafConfigFilter{
|
||||
OrgID: &opt.orgID,
|
||||
})
|
||||
for _, tt := range teles {
|
||||
if opt.name != "" && tt.Name == opt.name {
|
||||
config = teles[0]
|
||||
break
|
||||
}
|
||||
}
|
||||
case opt.id != 0:
|
||||
config, err = teleSVC.FindTelegrafConfigByID(timedCtx(time.Second), opt.id)
|
||||
default:
|
||||
require.Fail(t, "did not provide a valid get option")
|
||||
}
|
||||
if config == nil {
|
||||
return influxdb.TelegrafConfig{}, errors.New("did not find expected telegraf by name")
|
||||
}
|
||||
|
||||
return *config, err
|
||||
}
|
||||
|
||||
func (r resourceChecker) mustGetTelegrafConfig(t *testing.T, getOpt getResourceOptFn) influxdb.TelegrafConfig {
|
||||
t.Helper()
|
||||
|
||||
tele, err := r.getTelegrafConfig(t, getOpt)
|
||||
require.NoError(t, err)
|
||||
return tele
|
||||
}
|
||||
|
||||
func (r resourceChecker) mustDeleteTelegrafConfig(t *testing.T, id influxdb.ID) {
|
||||
t.Helper()
|
||||
require.NoError(t, r.tl.TelegrafService(t).DeleteTelegrafConfig(ctx, id))
|
||||
}
|
||||
|
||||
func (r resourceChecker) getVariable(t *testing.T, getOpt getResourceOptFn) (influxdb.Variable, error) {
|
||||
t.Helper()
|
||||
|
||||
|
|
|
|||
|
|
@ -259,7 +259,7 @@ func (ex *resourceExporter) resourceCloneToKind(ctx context.Context, r ResourceT
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mapResource(t.OrgID, t.ID, KindTelegraf, telegrafToObject(*t, r.Name))
|
||||
mapResource(t.OrgID, t.ID, KindTelegraf, TelegrafToObject(r.Name, *t))
|
||||
case r.Kind.is(KindVariable):
|
||||
v, err := ex.varSVC.FindVariableByID(ctx, r.ID)
|
||||
if err != nil {
|
||||
|
|
@ -913,7 +913,7 @@ func NotificationRuleToObject(name, endpointPkgName string, iRule influxdb.Notif
|
|||
// regex used to rip out the hard coded task option stuffs
|
||||
var taskFluxRegex = regexp.MustCompile(`option task = {(.|\n)*?}`)
|
||||
|
||||
// TaskToObject coverts an influxdb.Task into a pkger.Object.
|
||||
// TaskToObject converts an influxdb.Task into a pkger.Object.
|
||||
func TaskToObject(name string, t influxdb.Task) Object {
|
||||
if name == "" {
|
||||
name = t.Name
|
||||
|
|
@ -932,7 +932,8 @@ func TaskToObject(name string, t influxdb.Task) Object {
|
|||
return o
|
||||
}
|
||||
|
||||
func telegrafToObject(t influxdb.TelegrafConfig, name string) Object {
|
||||
// TelegrafToObject converts an influxdb.TelegrafConfig into a pkger.Object.
|
||||
func TelegrafToObject(name string, t influxdb.TelegrafConfig) Object {
|
||||
if name == "" {
|
||||
name = t.Name
|
||||
}
|
||||
|
|
|
|||
121
pkger/service.go
121
pkger/service.go
|
|
@ -684,6 +684,7 @@ func (s *Service) dryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg, opts
|
|||
s.dryRunDashboards(ctx, orgID, state.mDashboards)
|
||||
s.dryRunLabels(ctx, orgID, state.mLabels)
|
||||
s.dryRunTasks(ctx, orgID, state.mTasks)
|
||||
s.dryRunTelegrafConfigs(ctx, orgID, state.mTelegrafs)
|
||||
s.dryRunVariables(ctx, orgID, state.mVariables)
|
||||
err := s.dryRunNotificationEndpoints(ctx, orgID, state.mEndpoints)
|
||||
if err != nil {
|
||||
|
|
@ -864,6 +865,20 @@ func (s *Service) dryRunTasks(ctx context.Context, orgID influxdb.ID, tasks map[
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Service) dryRunTelegrafConfigs(ctx context.Context, orgID influxdb.ID, teleConfigs map[string]*stateTelegraf) {
|
||||
for _, stateTele := range teleConfigs {
|
||||
stateTele.orgID = orgID
|
||||
var existing *influxdb.TelegrafConfig
|
||||
if stateTele.ID() != 0 {
|
||||
existing, _ = s.teleSVC.FindTelegrafConfigByID(ctx, stateTele.ID())
|
||||
}
|
||||
if IsNew(stateTele.stateStatus) && existing != nil {
|
||||
stateTele.stateStatus = StateStatusExists
|
||||
}
|
||||
stateTele.existing = existing
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) dryRunVariables(ctx context.Context, orgID influxdb.ID, vars map[string]*stateVariable) {
|
||||
existingVars, _ := s.getAllPlatformVariables(ctx, orgID)
|
||||
|
||||
|
|
@ -1175,7 +1190,7 @@ func (s *Service) applyState(ctx context.Context, coordinator *rollbackCoordinat
|
|||
s.applyDashboards(ctx, state.dashboards()),
|
||||
s.applyNotificationEndpoints(ctx, userID, state.endpoints()),
|
||||
s.applyTasks(ctx, state.tasks()),
|
||||
s.applyTelegrafs(state.telegrafConfigs()),
|
||||
s.applyTelegrafs(ctx, userID, state.telegrafConfigs()),
|
||||
},
|
||||
}
|
||||
|
||||
|
|
@ -2070,29 +2085,29 @@ func (s *Service) rollbackTasks(ctx context.Context, tasks []*stateTask) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) applyTelegrafs(teles []*stateTelegraf) applier {
|
||||
func (s *Service) applyTelegrafs(ctx context.Context, userID influxdb.ID, teles []*stateTelegraf) applier {
|
||||
const resource = "telegrafs"
|
||||
|
||||
mutex := new(doMutex)
|
||||
rollbackTelegrafs := make([]*stateTelegraf, 0, len(teles))
|
||||
|
||||
createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody {
|
||||
var cfg influxdb.TelegrafConfig
|
||||
var t *stateTelegraf
|
||||
mutex.Do(func() {
|
||||
teles[i].orgID = orgID
|
||||
cfg = teles[i].summarize().TelegrafConfig
|
||||
t = teles[i]
|
||||
})
|
||||
|
||||
err := s.teleSVC.CreateTelegrafConfig(ctx, &cfg, userID)
|
||||
existing, err := s.applyTelegrafConfig(ctx, userID, t)
|
||||
if err != nil {
|
||||
return &applyErrBody{
|
||||
name: cfg.Name,
|
||||
name: t.parserTelegraf.Name(),
|
||||
msg: err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
mutex.Do(func() {
|
||||
teles[i].id = cfg.ID
|
||||
teles[i].id = existing.ID
|
||||
rollbackTelegrafs = append(rollbackTelegrafs, teles[i])
|
||||
})
|
||||
|
||||
|
|
@ -2107,14 +2122,65 @@ func (s *Service) applyTelegrafs(teles []*stateTelegraf) applier {
|
|||
rollbacker: rollbacker{
|
||||
resource: resource,
|
||||
fn: func(_ influxdb.ID) error {
|
||||
return s.deleteByIDs("telegraf", len(rollbackTelegrafs), s.teleSVC.DeleteTelegrafConfig, func(i int) influxdb.ID {
|
||||
return rollbackTelegrafs[i].ID()
|
||||
})
|
||||
return s.rollbackTelegrafConfigs(ctx, userID, rollbackTelegrafs)
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) applyTelegrafConfig(ctx context.Context, userID influxdb.ID, t *stateTelegraf) (influxdb.TelegrafConfig, error) {
|
||||
switch t.stateStatus {
|
||||
case StateStatusRemove:
|
||||
if err := s.teleSVC.DeleteTelegrafConfig(ctx, t.ID()); err != nil {
|
||||
return influxdb.TelegrafConfig{}, ierrors.Wrap(err, "failed to delete config")
|
||||
}
|
||||
return *t.existing, nil
|
||||
case StateStatusExists:
|
||||
cfg := t.summarize().TelegrafConfig
|
||||
updatedConfig, err := s.teleSVC.UpdateTelegrafConfig(ctx, t.ID(), &cfg, userID)
|
||||
if err != nil {
|
||||
return influxdb.TelegrafConfig{}, ierrors.Wrap(err, "failed to update config")
|
||||
}
|
||||
return *updatedConfig, nil
|
||||
default:
|
||||
cfg := t.summarize().TelegrafConfig
|
||||
err := s.teleSVC.CreateTelegrafConfig(ctx, &cfg, userID)
|
||||
if err != nil {
|
||||
return influxdb.TelegrafConfig{}, ierrors.Wrap(err, "failed to create telegraf config")
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) rollbackTelegrafConfigs(ctx context.Context, userID influxdb.ID, cfgs []*stateTelegraf) error {
|
||||
rollbackFn := func(t *stateTelegraf) error {
|
||||
var err error
|
||||
switch t.stateStatus {
|
||||
case StateStatusRemove:
|
||||
err = ierrors.Wrap(s.teleSVC.CreateTelegrafConfig(ctx, t.existing, userID), "rolling back removed telegraf config")
|
||||
case StateStatusExists:
|
||||
_, err = s.teleSVC.UpdateTelegrafConfig(ctx, t.ID(), t.existing, userID)
|
||||
err = ierrors.Wrap(err, "rolling back updated telegraf config")
|
||||
default:
|
||||
err = ierrors.Wrap(s.teleSVC.DeleteTelegrafConfig(ctx, t.ID()), "rolling back created telegraf config")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
var errs []string
|
||||
for _, v := range cfgs {
|
||||
if err := rollbackFn(v); err != nil {
|
||||
errs = append(errs, fmt.Sprintf("error for variable[%q]: %s", v.ID(), err))
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return errors.New(strings.Join(errs, "; "))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) applyVariables(ctx context.Context, vars []*stateVariable) applier {
|
||||
const resource = "variable"
|
||||
|
||||
|
|
@ -2295,23 +2361,6 @@ func (s *Service) rollbackLabelMappings(mappings []stateLabelMapping) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) deleteByIDs(resource string, numIDs int, deleteFn func(context.Context, influxdb.ID) error, iterFn func(int) influxdb.ID) error {
|
||||
var errs []string
|
||||
for i := range make([]struct{}, numIDs) {
|
||||
id := iterFn(i)
|
||||
err := deleteFn(context.Background(), id)
|
||||
if err != nil {
|
||||
errs = append(errs, id.String())
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return fmt.Errorf(`%s_ids=[%s] err="unable to delete"`, resource, strings.Join(errs, ", "))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) updateStackAfterSuccess(ctx context.Context, stackID influxdb.ID, state *stateCoordinator) error {
|
||||
stack, err := s.store.ReadStackByID(ctx, stackID)
|
||||
if err != nil {
|
||||
|
|
@ -2385,6 +2434,17 @@ func (s *Service) updateStackAfterSuccess(ctx context.Context, stackID influxdb.
|
|||
Name: t.parserTask.PkgName(),
|
||||
})
|
||||
}
|
||||
for _, t := range state.mTelegrafs {
|
||||
if IsRemoval(t.stateStatus) {
|
||||
continue
|
||||
}
|
||||
stackResources = append(stackResources, StackResource{
|
||||
APIVersion: APIVersion,
|
||||
ID: t.ID(),
|
||||
Kind: KindTelegraf,
|
||||
Name: t.parserTelegraf.PkgName(),
|
||||
})
|
||||
}
|
||||
for _, v := range state.mVariables {
|
||||
if IsRemoval(v.stateStatus) {
|
||||
continue
|
||||
|
|
@ -2469,6 +2529,13 @@ func (s *Service) updateStackAfterRollback(ctx context.Context, stackID influxdb
|
|||
res.ID = t.existing.ID
|
||||
}
|
||||
}
|
||||
for _, t := range state.mTelegrafs {
|
||||
res, ok := existingResources[newKey(KindTelegraf, t.parserTelegraf.PkgName())]
|
||||
if ok && res.ID != t.ID() {
|
||||
hasChanges = true
|
||||
res.ID = t.existing.ID
|
||||
}
|
||||
}
|
||||
for _, v := range state.mVariables {
|
||||
res, ok := existingResources[newKey(KindVariable, v.parserVar.PkgName())]
|
||||
if ok && res.ID != v.ID() {
|
||||
|
|
|
|||
Loading…
Reference in New Issue