parent
a86c7d7db6
commit
da2cf7e94a
|
@ -7765,8 +7765,8 @@ components:
|
|||
items:
|
||||
type: object
|
||||
properties:
|
||||
remove:
|
||||
type: boolean
|
||||
stateStatus:
|
||||
type: string
|
||||
id:
|
||||
type: string
|
||||
pkgName:
|
||||
|
|
180
pkger/models.go
180
pkger/models.go
|
@ -5,7 +5,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -510,41 +509,6 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
func newDiffTask(t *task) DiffTask {
|
||||
diff := DiffTask{
|
||||
DiffIdentifier: DiffIdentifier{
|
||||
ID: SafeID(t.ID()),
|
||||
Remove: t.shouldRemove,
|
||||
PkgName: t.PkgName(),
|
||||
},
|
||||
New: DiffTaskValues{
|
||||
Name: t.Name(),
|
||||
Cron: t.cron,
|
||||
Description: t.description,
|
||||
Every: durToStr(t.every),
|
||||
Offset: durToStr(t.offset),
|
||||
Query: t.query,
|
||||
Status: t.Status(),
|
||||
},
|
||||
}
|
||||
|
||||
if !t.Exists() {
|
||||
return diff
|
||||
}
|
||||
|
||||
diff.Old = &DiffTaskValues{
|
||||
Name: t.existing.Name,
|
||||
Cron: t.existing.Cron,
|
||||
Description: t.existing.Description,
|
||||
Every: t.existing.Every,
|
||||
Offset: t.existing.Offset.String(),
|
||||
Query: t.existing.Flux,
|
||||
Status: influxdb.Status(t.existing.Status),
|
||||
}
|
||||
|
||||
return diff
|
||||
}
|
||||
|
||||
// DiffTelegraf is a diff of an individual telegraf. This resource is always new.
|
||||
type DiffTelegraf struct {
|
||||
DiffIdentifier
|
||||
|
@ -1118,150 +1082,6 @@ func (r mapperNotificationRules) Len() int {
|
|||
return len(r)
|
||||
}
|
||||
|
||||
const (
|
||||
fieldTaskCron = "cron"
|
||||
)
|
||||
|
||||
type task struct {
|
||||
identity
|
||||
|
||||
id influxdb.ID
|
||||
orgID influxdb.ID
|
||||
cron string
|
||||
description string
|
||||
every time.Duration
|
||||
offset time.Duration
|
||||
query string
|
||||
status string
|
||||
|
||||
labels sortedLabels
|
||||
|
||||
existing *influxdb.Task
|
||||
}
|
||||
|
||||
func (t *task) Exists() bool {
|
||||
return t.existing != nil
|
||||
}
|
||||
|
||||
func (t *task) ID() influxdb.ID {
|
||||
if t.existing != nil {
|
||||
return t.existing.ID
|
||||
}
|
||||
return t.id
|
||||
}
|
||||
|
||||
func (t *task) Labels() []*label {
|
||||
return t.labels
|
||||
}
|
||||
|
||||
func (t *task) ResourceType() influxdb.ResourceType {
|
||||
return KindTask.ResourceType()
|
||||
}
|
||||
|
||||
func (t *task) Status() influxdb.Status {
|
||||
if t.status == "" {
|
||||
return influxdb.Active
|
||||
}
|
||||
return influxdb.Status(t.status)
|
||||
}
|
||||
|
||||
var fluxRegex = regexp.MustCompile(`import\s+\".*\"`)
|
||||
|
||||
func (t *task) flux() string {
|
||||
taskOpts := []string{fmt.Sprintf("name: %q", t.Name())}
|
||||
if t.cron != "" {
|
||||
taskOpts = append(taskOpts, fmt.Sprintf("cron: %q", t.cron))
|
||||
}
|
||||
if t.every > 0 {
|
||||
taskOpts = append(taskOpts, fmt.Sprintf("every: %s", t.every))
|
||||
}
|
||||
if t.offset > 0 {
|
||||
taskOpts = append(taskOpts, fmt.Sprintf("offset: %s", t.offset))
|
||||
}
|
||||
|
||||
// this is required by the API, super nasty. Will be super challenging for
|
||||
// anyone outside org to figure out how to do this within an hour of looking
|
||||
// at the API :sadpanda:. Would be ideal to let the API translate the arguments
|
||||
// into this required form instead of forcing that complexity on the caller.
|
||||
taskOptStr := fmt.Sprintf("\noption task = { %s }", strings.Join(taskOpts, ", "))
|
||||
|
||||
if indices := fluxRegex.FindAllIndex([]byte(t.query), -1); len(indices) > 0 {
|
||||
lastImportIdx := indices[len(indices)-1][1]
|
||||
pieces := append([]string{},
|
||||
t.query[:lastImportIdx],
|
||||
taskOptStr,
|
||||
t.query[lastImportIdx:],
|
||||
)
|
||||
return fmt.Sprint(strings.Join(pieces, "\n"))
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s\n%s", taskOptStr, t.query)
|
||||
}
|
||||
|
||||
func (t *task) summarize() SummaryTask {
|
||||
return SummaryTask{
|
||||
ID: SafeID(t.ID()),
|
||||
PkgName: t.PkgName(),
|
||||
Name: t.Name(),
|
||||
Cron: t.cron,
|
||||
Description: t.description,
|
||||
Every: durToStr(t.every),
|
||||
Offset: durToStr(t.offset),
|
||||
Query: t.query,
|
||||
Status: t.Status(),
|
||||
|
||||
LabelAssociations: toSummaryLabels(t.labels...),
|
||||
}
|
||||
}
|
||||
|
||||
func (t *task) valid() []validationErr {
|
||||
var vErrs []validationErr
|
||||
if t.cron == "" && t.every == 0 {
|
||||
vErrs = append(vErrs,
|
||||
validationErr{
|
||||
Field: fieldEvery,
|
||||
Msg: "must provide if cron field is not provided",
|
||||
},
|
||||
validationErr{
|
||||
Field: fieldTaskCron,
|
||||
Msg: "must provide if every field is not provided",
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if t.query == "" {
|
||||
vErrs = append(vErrs, validationErr{
|
||||
Field: fieldQuery,
|
||||
Msg: "must provide a non zero value",
|
||||
})
|
||||
}
|
||||
|
||||
if status := t.Status(); status != influxdb.Active && status != influxdb.Inactive {
|
||||
vErrs = append(vErrs, validationErr{
|
||||
Field: fieldStatus,
|
||||
Msg: "must be 1 of [active, inactive]",
|
||||
})
|
||||
}
|
||||
|
||||
if len(vErrs) > 0 {
|
||||
return []validationErr{
|
||||
objectValidationErr(fieldSpec, vErrs...),
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type mapperTasks []*task
|
||||
|
||||
func (m mapperTasks) Association(i int) labelAssociater {
|
||||
return m[i]
|
||||
}
|
||||
|
||||
func (m mapperTasks) Len() int {
|
||||
return len(m)
|
||||
}
|
||||
|
||||
const (
|
||||
fieldTelegrafConfig = "config"
|
||||
)
|
||||
|
|
|
@ -491,11 +491,6 @@ func TestPkg(t *testing.T) {
|
|||
kind: KindNotificationRule,
|
||||
validName: "rule_UUID",
|
||||
},
|
||||
{
|
||||
pkgFile: "testdata/tasks.yml",
|
||||
kind: KindTask,
|
||||
validName: "task_UUID",
|
||||
},
|
||||
{
|
||||
pkgFile: "testdata/telegraf.yml",
|
||||
kind: KindTelegraf,
|
||||
|
|
|
@ -424,11 +424,6 @@ func (p *Pkg) addObjectForRemoval(k Kind, pkgName string, id influxdb.ID) {
|
|||
identity: newIdentity,
|
||||
id: id,
|
||||
}
|
||||
case KindTask:
|
||||
p.mTasks[pkgName] = &task{
|
||||
identity: newIdentity,
|
||||
id: id,
|
||||
}
|
||||
case KindTelegraf:
|
||||
p.mTelegrafs[pkgName] = &telegraf{
|
||||
identity: newIdentity,
|
||||
|
@ -454,11 +449,6 @@ func (p *Pkg) getObjectIDSetter(k Kind, pkgName string) (func(influxdb.ID), bool
|
|||
return func(id influxdb.ID) {
|
||||
r.id = id
|
||||
}, ok
|
||||
case KindTask:
|
||||
t, ok := p.mTasks[pkgName]
|
||||
return func(id influxdb.ID) {
|
||||
t.id = id
|
||||
}, ok
|
||||
case KindTelegraf:
|
||||
t, ok := p.mTelegrafs[pkgName]
|
||||
return func(id influxdb.ID) {
|
||||
|
|
|
@ -3,6 +3,7 @@ package pkger
|
|||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -472,108 +473,6 @@ func (s sortedLabels) Swap(i, j int) {
|
|||
s[i], s[j] = s[j], s[i]
|
||||
}
|
||||
|
||||
const (
|
||||
fieldArgTypeConstant = "constant"
|
||||
fieldArgTypeMap = "map"
|
||||
fieldArgTypeQuery = "query"
|
||||
)
|
||||
|
||||
type variable struct {
|
||||
identity
|
||||
|
||||
Description string
|
||||
Type string
|
||||
Query string
|
||||
Language string
|
||||
ConstValues []string
|
||||
MapValues map[string]string
|
||||
|
||||
labels sortedLabels
|
||||
}
|
||||
|
||||
func (v *variable) Labels() []*label {
|
||||
return v.labels
|
||||
}
|
||||
|
||||
func (v *variable) ResourceType() influxdb.ResourceType {
|
||||
return KindVariable.ResourceType()
|
||||
}
|
||||
|
||||
func (v *variable) summarize() SummaryVariable {
|
||||
return SummaryVariable{
|
||||
PkgName: v.PkgName(),
|
||||
Name: v.Name(),
|
||||
Description: v.Description,
|
||||
Arguments: v.influxVarArgs(),
|
||||
LabelAssociations: toSummaryLabels(v.labels...),
|
||||
}
|
||||
}
|
||||
|
||||
func (v *variable) influxVarArgs() *influxdb.VariableArguments {
|
||||
// this zero value check is for situations where we want to marshal/unmarshal
|
||||
// a variable and not have the invalid args blow up during unmarshaling. When
|
||||
// that validation is decoupled from the unmarshaling, we can clean this up.
|
||||
if v.Type == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
args := &influxdb.VariableArguments{
|
||||
Type: v.Type,
|
||||
}
|
||||
switch args.Type {
|
||||
case "query":
|
||||
args.Values = influxdb.VariableQueryValues{
|
||||
Query: v.Query,
|
||||
Language: v.Language,
|
||||
}
|
||||
case "constant":
|
||||
args.Values = influxdb.VariableConstantValues(v.ConstValues)
|
||||
case "map":
|
||||
args.Values = influxdb.VariableMapValues(v.MapValues)
|
||||
}
|
||||
return args
|
||||
}
|
||||
|
||||
func (v *variable) valid() []validationErr {
|
||||
var failures []validationErr
|
||||
switch v.Type {
|
||||
case "map":
|
||||
if len(v.MapValues) == 0 {
|
||||
failures = append(failures, validationErr{
|
||||
Field: fieldValues,
|
||||
Msg: "map variable must have at least 1 key/val pair",
|
||||
})
|
||||
}
|
||||
case "constant":
|
||||
if len(v.ConstValues) == 0 {
|
||||
failures = append(failures, validationErr{
|
||||
Field: fieldValues,
|
||||
Msg: "constant variable must have a least 1 value provided",
|
||||
})
|
||||
}
|
||||
case "query":
|
||||
if v.Query == "" {
|
||||
failures = append(failures, validationErr{
|
||||
Field: fieldQuery,
|
||||
Msg: "query variable must provide a query string",
|
||||
})
|
||||
}
|
||||
if v.Language != "influxql" && v.Language != "flux" {
|
||||
failures = append(failures, validationErr{
|
||||
Field: fieldLanguage,
|
||||
Msg: fmt.Sprintf(`query variable language must be either "influxql" or "flux"; got %q`, v.Language),
|
||||
})
|
||||
}
|
||||
}
|
||||
if len(failures) > 0 {
|
||||
return []validationErr{
|
||||
objectValidationErr(fieldSpec, failures...),
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type thresholdType string
|
||||
|
||||
const (
|
||||
|
@ -844,3 +743,257 @@ func (n *notificationEndpoint) valid() []validationErr {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
fieldTaskCron = "cron"
|
||||
)
|
||||
|
||||
type task struct {
|
||||
identity
|
||||
|
||||
cron string
|
||||
description string
|
||||
every time.Duration
|
||||
offset time.Duration
|
||||
query string
|
||||
status string
|
||||
|
||||
labels sortedLabels
|
||||
}
|
||||
|
||||
func (t *task) Labels() []*label {
|
||||
return t.labels
|
||||
}
|
||||
|
||||
func (t *task) ResourceType() influxdb.ResourceType {
|
||||
return KindTask.ResourceType()
|
||||
}
|
||||
|
||||
func (t *task) Status() influxdb.Status {
|
||||
if t.status == "" {
|
||||
return influxdb.Active
|
||||
}
|
||||
return influxdb.Status(t.status)
|
||||
}
|
||||
|
||||
func (t *task) flux() string {
|
||||
translator := taskFluxTranslation{
|
||||
name: t.Name(),
|
||||
cron: t.cron,
|
||||
every: t.every,
|
||||
offset: t.offset,
|
||||
rawQuery: t.query,
|
||||
}
|
||||
return translator.flux()
|
||||
}
|
||||
|
||||
func (t *task) summarize() SummaryTask {
|
||||
return SummaryTask{
|
||||
PkgName: t.PkgName(),
|
||||
Name: t.Name(),
|
||||
Cron: t.cron,
|
||||
Description: t.description,
|
||||
Every: durToStr(t.every),
|
||||
Offset: durToStr(t.offset),
|
||||
Query: t.query,
|
||||
Status: t.Status(),
|
||||
|
||||
LabelAssociations: toSummaryLabels(t.labels...),
|
||||
}
|
||||
}
|
||||
|
||||
func (t *task) valid() []validationErr {
|
||||
var vErrs []validationErr
|
||||
if t.cron == "" && t.every == 0 {
|
||||
vErrs = append(vErrs,
|
||||
validationErr{
|
||||
Field: fieldEvery,
|
||||
Msg: "must provide if cron field is not provided",
|
||||
},
|
||||
validationErr{
|
||||
Field: fieldTaskCron,
|
||||
Msg: "must provide if every field is not provided",
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if t.query == "" {
|
||||
vErrs = append(vErrs, validationErr{
|
||||
Field: fieldQuery,
|
||||
Msg: "must provide a non zero value",
|
||||
})
|
||||
}
|
||||
|
||||
if status := t.Status(); status != influxdb.Active && status != influxdb.Inactive {
|
||||
vErrs = append(vErrs, validationErr{
|
||||
Field: fieldStatus,
|
||||
Msg: "must be 1 of [active, inactive]",
|
||||
})
|
||||
}
|
||||
|
||||
if len(vErrs) > 0 {
|
||||
return []validationErr{
|
||||
objectValidationErr(fieldSpec, vErrs...),
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var fluxRegex = regexp.MustCompile(`import\s+\".*\"`)
|
||||
|
||||
type taskFluxTranslation struct {
|
||||
name string
|
||||
cron string
|
||||
every time.Duration
|
||||
offset time.Duration
|
||||
|
||||
rawQuery string
|
||||
}
|
||||
|
||||
func (tft taskFluxTranslation) flux() string {
|
||||
var sb strings.Builder
|
||||
writeLine := func(s string) {
|
||||
sb.WriteString(s + "\n")
|
||||
}
|
||||
|
||||
imports, queryBody := tft.separateQueryImports()
|
||||
if imports != "" {
|
||||
writeLine(imports + "\n")
|
||||
}
|
||||
|
||||
writeLine(tft.generateTaskOption())
|
||||
sb.WriteString(queryBody)
|
||||
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func (tft taskFluxTranslation) separateQueryImports() (imports string, querySansImports string) {
|
||||
if indices := fluxRegex.FindAllIndex([]byte(tft.rawQuery), -1); len(indices) > 0 {
|
||||
lastImportIdx := indices[len(indices)-1][1]
|
||||
return tft.rawQuery[:lastImportIdx], tft.rawQuery[lastImportIdx:]
|
||||
}
|
||||
|
||||
return "", tft.rawQuery
|
||||
}
|
||||
|
||||
func (tft taskFluxTranslation) generateTaskOption() string {
|
||||
taskOpts := []string{fmt.Sprintf("name: %q", tft.name)}
|
||||
if tft.cron != "" {
|
||||
taskOpts = append(taskOpts, fmt.Sprintf("cron: %q", tft.cron))
|
||||
}
|
||||
if tft.every > 0 {
|
||||
taskOpts = append(taskOpts, fmt.Sprintf("every: %s", tft.every))
|
||||
}
|
||||
if tft.offset > 0 {
|
||||
taskOpts = append(taskOpts, fmt.Sprintf("offset: %s", tft.offset))
|
||||
}
|
||||
|
||||
// this is required by the API, super nasty. Will be super challenging for
|
||||
// anyone outside org to figure out how to do this within an hour of looking
|
||||
// at the API :sadpanda:. Would be ideal to let the API translate the arguments
|
||||
// into this required form instead of forcing that complexity on the caller.
|
||||
return fmt.Sprintf("option task = { %s }", strings.Join(taskOpts, ", "))
|
||||
}
|
||||
|
||||
const (
|
||||
fieldArgTypeConstant = "constant"
|
||||
fieldArgTypeMap = "map"
|
||||
fieldArgTypeQuery = "query"
|
||||
)
|
||||
|
||||
type variable struct {
|
||||
identity
|
||||
|
||||
Description string
|
||||
Type string
|
||||
Query string
|
||||
Language string
|
||||
ConstValues []string
|
||||
MapValues map[string]string
|
||||
|
||||
labels sortedLabels
|
||||
}
|
||||
|
||||
func (v *variable) Labels() []*label {
|
||||
return v.labels
|
||||
}
|
||||
|
||||
func (v *variable) ResourceType() influxdb.ResourceType {
|
||||
return KindVariable.ResourceType()
|
||||
}
|
||||
|
||||
func (v *variable) summarize() SummaryVariable {
|
||||
return SummaryVariable{
|
||||
PkgName: v.PkgName(),
|
||||
Name: v.Name(),
|
||||
Description: v.Description,
|
||||
Arguments: v.influxVarArgs(),
|
||||
LabelAssociations: toSummaryLabels(v.labels...),
|
||||
}
|
||||
}
|
||||
|
||||
func (v *variable) influxVarArgs() *influxdb.VariableArguments {
|
||||
// this zero value check is for situations where we want to marshal/unmarshal
|
||||
// a variable and not have the invalid args blow up during unmarshaling. When
|
||||
// that validation is decoupled from the unmarshaling, we can clean this up.
|
||||
if v.Type == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
args := &influxdb.VariableArguments{
|
||||
Type: v.Type,
|
||||
}
|
||||
switch args.Type {
|
||||
case "query":
|
||||
args.Values = influxdb.VariableQueryValues{
|
||||
Query: v.Query,
|
||||
Language: v.Language,
|
||||
}
|
||||
case "constant":
|
||||
args.Values = influxdb.VariableConstantValues(v.ConstValues)
|
||||
case "map":
|
||||
args.Values = influxdb.VariableMapValues(v.MapValues)
|
||||
}
|
||||
return args
|
||||
}
|
||||
|
||||
func (v *variable) valid() []validationErr {
|
||||
var failures []validationErr
|
||||
switch v.Type {
|
||||
case "map":
|
||||
if len(v.MapValues) == 0 {
|
||||
failures = append(failures, validationErr{
|
||||
Field: fieldValues,
|
||||
Msg: "map variable must have at least 1 key/val pair",
|
||||
})
|
||||
}
|
||||
case "constant":
|
||||
if len(v.ConstValues) == 0 {
|
||||
failures = append(failures, validationErr{
|
||||
Field: fieldValues,
|
||||
Msg: "constant variable must have a least 1 value provided",
|
||||
})
|
||||
}
|
||||
case "query":
|
||||
if v.Query == "" {
|
||||
failures = append(failures, validationErr{
|
||||
Field: fieldQuery,
|
||||
Msg: "query variable must provide a query string",
|
||||
})
|
||||
}
|
||||
if v.Language != "influxql" && v.Language != "flux" {
|
||||
failures = append(failures, validationErr{
|
||||
Field: fieldLanguage,
|
||||
Msg: fmt.Sprintf(`query variable language must be either "influxql" or "flux"; got %q`, v.Language),
|
||||
})
|
||||
}
|
||||
}
|
||||
if len(failures) > 0 {
|
||||
return []validationErr{
|
||||
objectValidationErr(fieldSpec, failures...),
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -687,7 +687,6 @@ func (s *Service) dryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg, opts
|
|||
|
||||
var diff Diff
|
||||
diff.Dashboards = s.dryRunDashboards(pkg)
|
||||
diff.Tasks = s.dryRunTasks(pkg)
|
||||
diff.Telegrafs = s.dryRunTelegraf(pkg)
|
||||
|
||||
diffRules, err := s.dryRunNotificationRules(ctx, orgID, pkg)
|
||||
|
@ -714,6 +713,7 @@ func (s *Service) dryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg, opts
|
|||
diff.Checks = stateDiff.Checks
|
||||
diff.NotificationEndpoints = stateDiff.NotificationEndpoints
|
||||
diff.Labels = stateDiff.Labels
|
||||
diff.Tasks = stateDiff.Tasks
|
||||
diff.Variables = stateDiff.Variables
|
||||
diff.LabelMappings = append(stateDiff.LabelMappings, diff.LabelMappings...)
|
||||
|
||||
|
@ -897,14 +897,6 @@ func (s *Service) dryRunSecrets(ctx context.Context, orgID influxdb.ID, pkg *Pkg
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) dryRunTasks(pkg *Pkg) []DiffTask {
|
||||
var diffs []DiffTask
|
||||
for _, t := range pkg.tasks() {
|
||||
diffs = append(diffs, newDiffTask(t))
|
||||
}
|
||||
return diffs
|
||||
}
|
||||
|
||||
func (s *Service) dryRunTelegraf(pkg *Pkg) []DiffTelegraf {
|
||||
telegrafs := pkg.telegrafs()
|
||||
diffs := make([]DiffTelegraf, 0, len(telegrafs))
|
||||
|
@ -960,7 +952,6 @@ func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg, state *stat
|
|||
mappers := []labelMappers{
|
||||
mapperDashboards(pkg.dashboards()),
|
||||
mapperNotificationRules(pkg.notificationRules()),
|
||||
mapperTasks(pkg.tasks()),
|
||||
mapperTelegrafs(pkg.telegrafs()),
|
||||
}
|
||||
|
||||
|
@ -1098,6 +1089,17 @@ func (s *Service) dryRunLabelMappingsV2(ctx context.Context, state *stateCoordin
|
|||
mappings = append(mappings, mm...)
|
||||
}
|
||||
|
||||
for _, t := range state.mTasks {
|
||||
if IsRemoval(t.stateStatus) {
|
||||
continue
|
||||
}
|
||||
mm, err := s.dryRunResourceLabelMappingV2(ctx, state, stateLabelsByResName, t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mappings = append(mappings, mm...)
|
||||
}
|
||||
|
||||
for _, v := range state.mVariables {
|
||||
if IsRemoval(v.stateStatus) {
|
||||
continue
|
||||
|
@ -1183,6 +1185,7 @@ func (s *Service) addStackState(ctx context.Context, stackID influxdb.ID, pkg *P
|
|||
KindCheck,
|
||||
KindLabel,
|
||||
KindNotificationEndpoint,
|
||||
KindTask,
|
||||
KindVariable,
|
||||
}
|
||||
|
||||
|
@ -1321,7 +1324,7 @@ func (s *Service) apply(ctx context.Context, coordinator *rollbackCoordinator, o
|
|||
s.applyChecks(ctx, state.checks()),
|
||||
s.applyDashboards(pkg.dashboards()),
|
||||
s.applyNotificationEndpoints(ctx, userID, state.endpoints()),
|
||||
s.applyTasks(pkg.tasks()),
|
||||
s.applyTasks(state.tasks()),
|
||||
s.applyTelegrafs(pkg.telegrafs()),
|
||||
},
|
||||
}
|
||||
|
@ -2057,34 +2060,37 @@ func (s *Service) applySecrets(secrets map[string]string) applier {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Service) applyTasks(tasks []*task) applier {
|
||||
func (s *Service) applyTasks(tasks []*stateTask) applier {
|
||||
const resource = "tasks"
|
||||
|
||||
mutex := new(doMutex)
|
||||
rollbackTasks := make([]task, 0, len(tasks))
|
||||
rollbackTasks := make([]*stateTask, 0, len(tasks))
|
||||
|
||||
createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody {
|
||||
var t task
|
||||
var t *stateTask
|
||||
mutex.Do(func() {
|
||||
tasks[i].orgID = orgID
|
||||
t = *tasks[i]
|
||||
t = tasks[i]
|
||||
})
|
||||
|
||||
newTask, err := s.taskSVC.CreateTask(ctx, influxdb.TaskCreate{
|
||||
Type: influxdb.TaskSystemType,
|
||||
Flux: t.flux(),
|
||||
Flux: t.parserTask.flux(),
|
||||
OwnerID: userID,
|
||||
Description: t.description,
|
||||
Status: string(t.Status()),
|
||||
Description: t.parserTask.description,
|
||||
Status: string(t.parserTask.Status()),
|
||||
OrganizationID: t.orgID,
|
||||
})
|
||||
if err != nil {
|
||||
return &applyErrBody{name: t.Name(), msg: err.Error()}
|
||||
return &applyErrBody{
|
||||
name: t.parserTask.Name(),
|
||||
msg: err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
mutex.Do(func() {
|
||||
tasks[i].id = newTask.ID
|
||||
rollbackTasks = append(rollbackTasks, *tasks[i])
|
||||
rollbackTasks = append(rollbackTasks, tasks[i])
|
||||
})
|
||||
|
||||
return nil
|
||||
|
@ -2610,6 +2616,7 @@ func newSummaryFromStatePkg(pkg *Pkg, state *stateCoordinator) Summary {
|
|||
pkgSum.Checks = stateSum.Checks
|
||||
pkgSum.NotificationEndpoints = stateSum.NotificationEndpoints
|
||||
pkgSum.Labels = stateSum.Labels
|
||||
pkgSum.Tasks = stateSum.Tasks
|
||||
pkgSum.Variables = stateSum.Variables
|
||||
|
||||
// filter out label mappings that are from pgk and replace with those
|
||||
|
@ -2618,6 +2625,7 @@ func newSummaryFromStatePkg(pkg *Pkg, state *stateCoordinator) Summary {
|
|||
influxdb.BucketsResourceType: true,
|
||||
influxdb.ChecksResourceType: true,
|
||||
influxdb.NotificationEndpointResourceType: true,
|
||||
influxdb.TasksResourceType: true,
|
||||
influxdb.VariablesResourceType: true,
|
||||
}
|
||||
for _, lm := range pkgSum.LabelMappings {
|
||||
|
|
|
@ -12,6 +12,7 @@ type stateCoordinator struct {
|
|||
mChecks map[string]*stateCheck
|
||||
mEndpoints map[string]*stateEndpoint
|
||||
mLabels map[string]*stateLabel
|
||||
mTasks map[string]*stateTask
|
||||
mVariables map[string]*stateVariable
|
||||
|
||||
labelMappings []stateLabelMapping
|
||||
|
@ -23,6 +24,7 @@ func newStateCoordinator(pkg *Pkg) *stateCoordinator {
|
|||
mChecks: make(map[string]*stateCheck),
|
||||
mEndpoints: make(map[string]*stateEndpoint),
|
||||
mLabels: make(map[string]*stateLabel),
|
||||
mTasks: make(map[string]*stateTask),
|
||||
mVariables: make(map[string]*stateVariable),
|
||||
}
|
||||
|
||||
|
@ -50,6 +52,12 @@ func newStateCoordinator(pkg *Pkg) *stateCoordinator {
|
|||
stateStatus: StateStatusNew,
|
||||
}
|
||||
}
|
||||
for _, pkgTask := range pkg.tasks() {
|
||||
state.mTasks[pkgTask.PkgName()] = &stateTask{
|
||||
parserTask: pkgTask,
|
||||
stateStatus: StateStatusNew,
|
||||
}
|
||||
}
|
||||
for _, pkgVar := range pkg.variables() {
|
||||
state.mVariables[pkgVar.PkgName()] = &stateVariable{
|
||||
parserVar: pkgVar,
|
||||
|
@ -92,6 +100,14 @@ func (s *stateCoordinator) labels() []*stateLabel {
|
|||
return out
|
||||
}
|
||||
|
||||
func (s *stateCoordinator) tasks() []*stateTask {
|
||||
out := make([]*stateTask, 0, len(s.mTasks))
|
||||
for _, t := range s.mTasks {
|
||||
out = append(out, t)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (s *stateCoordinator) variables() []*stateVariable {
|
||||
out := make([]*stateVariable, 0, len(s.mVariables))
|
||||
for _, v := range s.mVariables {
|
||||
|
@ -130,6 +146,13 @@ func (s *stateCoordinator) diff() Diff {
|
|||
return diff.Labels[i].PkgName < diff.Labels[j].PkgName
|
||||
})
|
||||
|
||||
for _, t := range s.mTasks {
|
||||
diff.Tasks = append(diff.Tasks, t.diffTask())
|
||||
}
|
||||
sort.Slice(diff.Tasks, func(i, j int) bool {
|
||||
return diff.Tasks[i].PkgName < diff.Tasks[j].PkgName
|
||||
})
|
||||
|
||||
for _, v := range s.mVariables {
|
||||
diff.Variables = append(diff.Variables, v.diffVariable())
|
||||
}
|
||||
|
@ -199,6 +222,16 @@ func (s *stateCoordinator) summary() Summary {
|
|||
return sum.Labels[i].PkgName < sum.Labels[j].PkgName
|
||||
})
|
||||
|
||||
for _, t := range s.mTasks {
|
||||
if IsRemoval(t.stateStatus) {
|
||||
continue
|
||||
}
|
||||
sum.Tasks = append(sum.Tasks, t.summarize())
|
||||
}
|
||||
sort.Slice(sum.Tasks, func(i, j int) bool {
|
||||
return sum.Tasks[i].PkgName < sum.Tasks[j].PkgName
|
||||
})
|
||||
|
||||
for _, v := range s.mVariables {
|
||||
if IsRemoval(v.stateStatus) {
|
||||
continue
|
||||
|
@ -281,6 +314,12 @@ func (s *stateCoordinator) addObjectForRemoval(k Kind, pkgName string, id influx
|
|||
parserEndpoint: ¬ificationEndpoint{identity: newIdentity},
|
||||
stateStatus: StateStatusRemove,
|
||||
}
|
||||
case KindTask:
|
||||
s.mTasks[pkgName] = &stateTask{
|
||||
id: id,
|
||||
parserTask: &task{identity: newIdentity},
|
||||
stateStatus: StateStatusRemove,
|
||||
}
|
||||
case KindVariable:
|
||||
s.mVariables[pkgName] = &stateVariable{
|
||||
id: id,
|
||||
|
@ -319,6 +358,12 @@ func (s *stateCoordinator) getObjectIDSetter(k Kind, pkgName string) (func(influ
|
|||
r.id = id
|
||||
r.stateStatus = StateStatusExists
|
||||
}, ok
|
||||
case KindTask:
|
||||
r, ok := s.mTasks[pkgName]
|
||||
return func(id influxdb.ID) {
|
||||
r.id = id
|
||||
r.stateStatus = StateStatusExists
|
||||
}, ok
|
||||
case KindVariable:
|
||||
r, ok := s.mVariables[pkgName]
|
||||
return func(id influxdb.ID) {
|
||||
|
@ -664,6 +709,80 @@ func (e *stateEndpoint) summarize() SummaryNotificationEndpoint {
|
|||
return sum
|
||||
}
|
||||
|
||||
type stateTask struct {
|
||||
id, orgID influxdb.ID
|
||||
stateStatus StateStatus
|
||||
|
||||
parserTask *task
|
||||
existing *influxdb.Task
|
||||
}
|
||||
|
||||
func (t *stateTask) ID() influxdb.ID {
|
||||
if !IsNew(t.stateStatus) && t.existing != nil {
|
||||
return t.existing.ID
|
||||
}
|
||||
return t.id
|
||||
}
|
||||
|
||||
func (t *stateTask) diffTask() DiffTask {
|
||||
diff := DiffTask{
|
||||
DiffIdentifier: DiffIdentifier{
|
||||
ID: SafeID(t.ID()),
|
||||
Remove: IsRemoval(t.stateStatus),
|
||||
PkgName: t.parserTask.PkgName(),
|
||||
},
|
||||
New: DiffTaskValues{
|
||||
Name: t.parserTask.Name(),
|
||||
Cron: t.parserTask.cron,
|
||||
Description: t.parserTask.description,
|
||||
Every: durToStr(t.parserTask.every),
|
||||
Offset: durToStr(t.parserTask.offset),
|
||||
Query: t.parserTask.query,
|
||||
Status: t.parserTask.Status(),
|
||||
},
|
||||
}
|
||||
|
||||
if t.existing == nil {
|
||||
return diff
|
||||
}
|
||||
|
||||
diff.Old = &DiffTaskValues{
|
||||
Name: t.existing.Name,
|
||||
Cron: t.existing.Cron,
|
||||
Description: t.existing.Description,
|
||||
Every: t.existing.Every,
|
||||
Offset: t.existing.Offset.String(),
|
||||
Query: t.existing.Flux,
|
||||
Status: influxdb.Status(t.existing.Status),
|
||||
}
|
||||
|
||||
return diff
|
||||
}
|
||||
|
||||
func (t *stateTask) labels() []*label {
|
||||
return t.parserTask.labels
|
||||
}
|
||||
|
||||
func (t *stateTask) resourceType() influxdb.ResourceType {
|
||||
return influxdb.TasksResourceType
|
||||
}
|
||||
|
||||
func (t *stateTask) stateIdentity() stateIdentity {
|
||||
return stateIdentity{
|
||||
id: t.ID(),
|
||||
name: t.parserTask.Name(),
|
||||
pkgName: t.parserTask.PkgName(),
|
||||
resourceType: t.resourceType(),
|
||||
stateStatus: t.stateStatus,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *stateTask) summarize() SummaryTask {
|
||||
sum := t.parserTask.summarize()
|
||||
sum.ID = SafeID(t.id)
|
||||
return sum
|
||||
}
|
||||
|
||||
type stateVariable struct {
|
||||
id, orgID influxdb.ID
|
||||
stateStatus StateStatus
|
||||
|
|
|
@ -1141,32 +1141,35 @@ func TestService(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("maps tasks with labels", func(t *testing.T) {
|
||||
testLabelMappingFn(
|
||||
t,
|
||||
"testdata/tasks.yml",
|
||||
2,
|
||||
func() []ServiceSetterFn {
|
||||
fakeTaskSVC := mock.NewTaskService()
|
||||
fakeTaskSVC.CreateTaskFn = func(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) {
|
||||
reg := regexp.MustCompile(`name: "(.+)",`)
|
||||
names := reg.FindStringSubmatch(tc.Flux)
|
||||
if len(names) < 2 {
|
||||
return nil, errors.New("bad flux query provided: " + tc.Flux)
|
||||
}
|
||||
return &influxdb.Task{
|
||||
ID: influxdb.ID(rand.Int()),
|
||||
Type: tc.Type,
|
||||
OrganizationID: tc.OrganizationID,
|
||||
OwnerID: tc.OwnerID,
|
||||
Name: names[1],
|
||||
Description: tc.Description,
|
||||
Status: tc.Status,
|
||||
Flux: tc.Flux,
|
||||
}, nil
|
||||
opts := func() []ServiceSetterFn {
|
||||
fakeTaskSVC := mock.NewTaskService()
|
||||
fakeTaskSVC.CreateTaskFn = func(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) {
|
||||
reg := regexp.MustCompile(`name: "(.+)",`)
|
||||
names := reg.FindStringSubmatch(tc.Flux)
|
||||
if len(names) < 2 {
|
||||
return nil, errors.New("bad flux query provided: " + tc.Flux)
|
||||
}
|
||||
return []ServiceSetterFn{WithTaskSVC(fakeTaskSVC)}
|
||||
},
|
||||
)
|
||||
return &influxdb.Task{
|
||||
ID: influxdb.ID(rand.Int()),
|
||||
Type: tc.Type,
|
||||
OrganizationID: tc.OrganizationID,
|
||||
OwnerID: tc.OwnerID,
|
||||
Name: names[1],
|
||||
Description: tc.Description,
|
||||
Status: tc.Status,
|
||||
Flux: tc.Flux,
|
||||
}, nil
|
||||
}
|
||||
return []ServiceSetterFn{WithTaskSVC(fakeTaskSVC)}
|
||||
}
|
||||
|
||||
t.Run("applies successfully", func(t *testing.T) {
|
||||
testLabelMappingV2ApplyFn(t, "testdata/tasks.yml", 2, opts)
|
||||
})
|
||||
|
||||
t.Run("deletes new label mappings on error", func(t *testing.T) {
|
||||
testLabelMappingV2RollbackFn(t, "testdata/tasks.yml", 1, opts)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("maps telegrafs with labels", func(t *testing.T) {
|
||||
|
@ -1391,11 +1394,15 @@ func TestService(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, sum.Tasks, 2)
|
||||
for i, actual := range sum.Tasks {
|
||||
assert.NotZero(t, actual.ID)
|
||||
assert.Equal(t, "task_"+strconv.Itoa(i), actual.Name)
|
||||
assert.Equal(t, "desc_"+strconv.Itoa(i), actual.Description)
|
||||
}
|
||||
assert.NotZero(t, sum.Tasks[0].ID)
|
||||
assert.Equal(t, "task_1", sum.Tasks[0].PkgName)
|
||||
assert.Equal(t, "task_1", sum.Tasks[0].Name)
|
||||
assert.Equal(t, "desc_1", sum.Tasks[0].Description)
|
||||
|
||||
assert.NotZero(t, sum.Tasks[1].ID)
|
||||
assert.Equal(t, "task_UUID", sum.Tasks[1].PkgName)
|
||||
assert.Equal(t, "task_0", sum.Tasks[1].Name)
|
||||
assert.Equal(t, "desc_0", sum.Tasks[1].Description)
|
||||
})
|
||||
})
|
||||
|
||||
|
|
Loading…
Reference in New Issue