chore(pkger): refactor parser variable state out into stateful type
references: #17434pull/17753/head
parent
cb7f8ba521
commit
c8a79f5653
|
@ -837,14 +837,21 @@ spec:
|
|||
mappings := sum.LabelMappings
|
||||
require.Len(t, mappings, 11)
|
||||
assert.Contains(t, mappings, newSumMapping(bkts[0].ID, bkts[0].PkgName, bkts[0].Name, influxdb.BucketsResourceType))
|
||||
//hasMapping(t, mappings, newSumMapping(pkger.SafeID(checks[0].Check.GetID()), checks[0].Check.GetName(), influxdb.ChecksResourceType))
|
||||
//hasMapping(t, mappings, newSumMapping(pkger.SafeID(checks[1].Check.GetID()), checks[1].Check.GetName(), influxdb.ChecksResourceType))
|
||||
//hasMapping(t, mappings, newSumMapping(dashs[0].ID, dashs[0].Name, influxdb.DashboardsResourceType))
|
||||
//hasMapping(t, mappings, newSumMapping(pkger.SafeID(endpoints[0].NotificationEndpoint.GetID()), endpoints[0].NotificationEndpoint.GetName(), influxdb.NotificationEndpointResourceType))
|
||||
//hasMapping(t, mappings, newSumMapping(rule.ID, rule.Name, influxdb.NotificationRuleResourceType))
|
||||
//hasMapping(t, mappings, newSumMapping(task.ID, task.Name, influxdb.TasksResourceType))
|
||||
//hasMapping(t, mappings, newSumMapping(pkger.SafeID(teles[0].TelegrafConfig.ID), teles[0].TelegrafConfig.Name, influxdb.TelegrafsResourceType))
|
||||
//hasMapping(t, mappings, newSumMapping(vars[0].ID, vars[0].Name, influxdb.VariablesResourceType))
|
||||
|
||||
ch0 := checks[0]
|
||||
assert.Contains(t, mappings, newSumMapping(pkger.SafeID(ch0.Check.GetID()), ch0.PkgName, ch0.Check.GetName(), influxdb.ChecksResourceType))
|
||||
|
||||
ch1 := checks[0]
|
||||
assert.Contains(t, mappings, newSumMapping(pkger.SafeID(ch1.Check.GetID()), ch1.PkgName, ch1.Check.GetName(), influxdb.ChecksResourceType))
|
||||
|
||||
ne := endpoints[0]
|
||||
assert.Contains(t, mappings, newSumMapping(pkger.SafeID(ne.NotificationEndpoint.GetID()), ne.PkgName, ne.NotificationEndpoint.GetName(), influxdb.NotificationEndpointResourceType))
|
||||
|
||||
assert.Contains(t, mappings, newSumMapping(dashs[0].ID, dashs[0].PkgName, dashs[0].Name, influxdb.DashboardsResourceType))
|
||||
assert.Contains(t, mappings, newSumMapping(rule.ID, rule.PkgName, rule.Name, influxdb.NotificationRuleResourceType))
|
||||
assert.Contains(t, mappings, newSumMapping(task.ID, task.PkgName, task.Name, influxdb.TasksResourceType))
|
||||
assert.Contains(t, mappings, newSumMapping(pkger.SafeID(teles[0].TelegrafConfig.ID), teles[0].PkgName, teles[0].TelegrafConfig.Name, influxdb.TelegrafsResourceType))
|
||||
assert.Contains(t, mappings, newSumMapping(vars[0].ID, vars[0].PkgName, vars[0].Name, influxdb.VariablesResourceType))
|
||||
})
|
||||
|
||||
t.Run("filtered by resource types", func(t *testing.T) {
|
||||
|
|
|
@ -7825,8 +7825,8 @@ components:
|
|||
items:
|
||||
type: object
|
||||
properties:
|
||||
remove:
|
||||
type: boolean
|
||||
stateStatus:
|
||||
type: string
|
||||
id:
|
||||
type: string
|
||||
pkgName:
|
||||
|
|
161
pkger/models.go
161
pkger/models.go
|
@ -603,31 +603,6 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
func newDiffVariable(v *variable, iv *influxdb.Variable) DiffVariable {
|
||||
diff := DiffVariable{
|
||||
DiffIdentifier: DiffIdentifier{
|
||||
ID: SafeID(v.ID()),
|
||||
Remove: v.shouldRemove,
|
||||
PkgName: v.PkgName(),
|
||||
},
|
||||
New: DiffVariableValues{
|
||||
Name: v.Name(),
|
||||
Description: v.Description,
|
||||
Args: v.influxVarArgs(),
|
||||
},
|
||||
}
|
||||
if iv != nil {
|
||||
diff.ID = SafeID(iv.ID)
|
||||
diff.Old = &DiffVariableValues{
|
||||
Name: iv.Name,
|
||||
Description: iv.Description,
|
||||
Args: iv.Arguments,
|
||||
}
|
||||
}
|
||||
|
||||
return diff
|
||||
}
|
||||
|
||||
func (d DiffVariable) hasConflict() bool {
|
||||
return !d.IsNew() && d.Old != nil && !reflect.DeepEqual(*d.Old, d.New)
|
||||
}
|
||||
|
@ -1682,142 +1657,6 @@ func (m mapperTelegrafs) Len() int {
|
|||
return len(m)
|
||||
}
|
||||
|
||||
const (
|
||||
fieldArgTypeConstant = "constant"
|
||||
fieldArgTypeMap = "map"
|
||||
fieldArgTypeQuery = "query"
|
||||
)
|
||||
|
||||
type variable struct {
|
||||
identity
|
||||
|
||||
id influxdb.ID
|
||||
OrgID influxdb.ID
|
||||
Description string
|
||||
Type string
|
||||
Query string
|
||||
Language string
|
||||
ConstValues []string
|
||||
MapValues map[string]string
|
||||
|
||||
labels sortedLabels
|
||||
|
||||
existing *influxdb.Variable
|
||||
}
|
||||
|
||||
func (v *variable) ID() influxdb.ID {
|
||||
if v.existing != nil {
|
||||
return v.existing.ID
|
||||
}
|
||||
return v.id
|
||||
}
|
||||
|
||||
func (v *variable) Exists() bool {
|
||||
return v.existing != nil
|
||||
}
|
||||
|
||||
func (v *variable) Labels() []*label {
|
||||
return v.labels
|
||||
}
|
||||
|
||||
func (v *variable) ResourceType() influxdb.ResourceType {
|
||||
return KindVariable.ResourceType()
|
||||
}
|
||||
|
||||
func (v *variable) shouldApply() bool {
|
||||
return v.existing == nil ||
|
||||
v.existing.Description != v.Description ||
|
||||
v.existing.Arguments == nil ||
|
||||
!reflect.DeepEqual(v.existing.Arguments, v.influxVarArgs())
|
||||
}
|
||||
|
||||
func (v *variable) summarize() SummaryVariable {
|
||||
return SummaryVariable{
|
||||
PkgName: v.PkgName(),
|
||||
ID: SafeID(v.ID()),
|
||||
OrgID: SafeID(v.OrgID),
|
||||
Name: v.Name(),
|
||||
Description: v.Description,
|
||||
Arguments: v.influxVarArgs(),
|
||||
LabelAssociations: toSummaryLabels(v.labels...),
|
||||
}
|
||||
}
|
||||
|
||||
func (v *variable) influxVarArgs() *influxdb.VariableArguments {
|
||||
// this zero value check is for situations where we want to marshal/unmarshal
|
||||
// a variable and not have the invalid args blow up during unmarshaling. When
|
||||
// that validation is decoupled from the unmarshaling, we can clean this up.
|
||||
if v.Type == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
args := &influxdb.VariableArguments{
|
||||
Type: v.Type,
|
||||
}
|
||||
switch args.Type {
|
||||
case "query":
|
||||
args.Values = influxdb.VariableQueryValues{
|
||||
Query: v.Query,
|
||||
Language: v.Language,
|
||||
}
|
||||
case "constant":
|
||||
args.Values = influxdb.VariableConstantValues(v.ConstValues)
|
||||
case "map":
|
||||
args.Values = influxdb.VariableMapValues(v.MapValues)
|
||||
}
|
||||
return args
|
||||
}
|
||||
|
||||
func (v *variable) valid() []validationErr {
|
||||
var failures []validationErr
|
||||
switch v.Type {
|
||||
case "map":
|
||||
if len(v.MapValues) == 0 {
|
||||
failures = append(failures, validationErr{
|
||||
Field: fieldValues,
|
||||
Msg: "map variable must have at least 1 key/val pair",
|
||||
})
|
||||
}
|
||||
case "constant":
|
||||
if len(v.ConstValues) == 0 {
|
||||
failures = append(failures, validationErr{
|
||||
Field: fieldValues,
|
||||
Msg: "constant variable must have a least 1 value provided",
|
||||
})
|
||||
}
|
||||
case "query":
|
||||
if v.Query == "" {
|
||||
failures = append(failures, validationErr{
|
||||
Field: fieldQuery,
|
||||
Msg: "query variable must provide a query string",
|
||||
})
|
||||
}
|
||||
if v.Language != "influxql" && v.Language != "flux" {
|
||||
failures = append(failures, validationErr{
|
||||
Field: fieldLanguage,
|
||||
Msg: fmt.Sprintf(`query variable language must be either "influxql" or "flux"; got %q`, v.Language),
|
||||
})
|
||||
}
|
||||
}
|
||||
if len(failures) > 0 {
|
||||
return []validationErr{
|
||||
objectValidationErr(fieldSpec, failures...),
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type mapperVariables []*variable
|
||||
|
||||
func (m mapperVariables) Association(i int) labelAssociater {
|
||||
return m[i]
|
||||
}
|
||||
|
||||
func (m mapperVariables) Len() int {
|
||||
return len(m)
|
||||
}
|
||||
|
||||
const (
|
||||
fieldDashCharts = "charts"
|
||||
)
|
||||
|
|
|
@ -506,11 +506,6 @@ func TestPkg(t *testing.T) {
|
|||
kind: KindTelegraf,
|
||||
validName: "first_tele_config",
|
||||
},
|
||||
{
|
||||
pkgFile: "testdata/variables.yml",
|
||||
kind: KindVariable,
|
||||
validName: "var_query_1",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
|
|
@ -442,11 +442,6 @@ func (p *Pkg) addObjectForRemoval(k Kind, pkgName string, id influxdb.ID) {
|
|||
identity: newIdentity,
|
||||
config: influxdb.TelegrafConfig{ID: id},
|
||||
}
|
||||
case KindVariable:
|
||||
p.mVariables[pkgName] = &variable{
|
||||
identity: newIdentity,
|
||||
id: id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -485,11 +480,6 @@ func (p *Pkg) getObjectIDSetter(k Kind, pkgName string) (func(influxdb.ID), bool
|
|||
return func(id influxdb.ID) {
|
||||
t.config.ID = id
|
||||
}, ok
|
||||
case KindVariable:
|
||||
v, ok := p.mVariables[pkgName]
|
||||
return func(id influxdb.ID) {
|
||||
v.id = id
|
||||
}, ok
|
||||
default:
|
||||
return nil, false
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package pkger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -468,3 +469,105 @@ func (s sortedLabels) Less(i, j int) bool {
|
|||
func (s sortedLabels) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
}
|
||||
|
||||
const (
|
||||
fieldArgTypeConstant = "constant"
|
||||
fieldArgTypeMap = "map"
|
||||
fieldArgTypeQuery = "query"
|
||||
)
|
||||
|
||||
type variable struct {
|
||||
identity
|
||||
|
||||
Description string
|
||||
Type string
|
||||
Query string
|
||||
Language string
|
||||
ConstValues []string
|
||||
MapValues map[string]string
|
||||
|
||||
labels sortedLabels
|
||||
}
|
||||
|
||||
func (v *variable) Labels() []*label {
|
||||
return v.labels
|
||||
}
|
||||
|
||||
func (v *variable) ResourceType() influxdb.ResourceType {
|
||||
return KindVariable.ResourceType()
|
||||
}
|
||||
|
||||
func (v *variable) summarize() SummaryVariable {
|
||||
return SummaryVariable{
|
||||
PkgName: v.PkgName(),
|
||||
Name: v.Name(),
|
||||
Description: v.Description,
|
||||
Arguments: v.influxVarArgs(),
|
||||
LabelAssociations: toSummaryLabels(v.labels...),
|
||||
}
|
||||
}
|
||||
|
||||
func (v *variable) influxVarArgs() *influxdb.VariableArguments {
|
||||
// this zero value check is for situations where we want to marshal/unmarshal
|
||||
// a variable and not have the invalid args blow up during unmarshaling. When
|
||||
// that validation is decoupled from the unmarshaling, we can clean this up.
|
||||
if v.Type == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
args := &influxdb.VariableArguments{
|
||||
Type: v.Type,
|
||||
}
|
||||
switch args.Type {
|
||||
case "query":
|
||||
args.Values = influxdb.VariableQueryValues{
|
||||
Query: v.Query,
|
||||
Language: v.Language,
|
||||
}
|
||||
case "constant":
|
||||
args.Values = influxdb.VariableConstantValues(v.ConstValues)
|
||||
case "map":
|
||||
args.Values = influxdb.VariableMapValues(v.MapValues)
|
||||
}
|
||||
return args
|
||||
}
|
||||
|
||||
func (v *variable) valid() []validationErr {
|
||||
var failures []validationErr
|
||||
switch v.Type {
|
||||
case "map":
|
||||
if len(v.MapValues) == 0 {
|
||||
failures = append(failures, validationErr{
|
||||
Field: fieldValues,
|
||||
Msg: "map variable must have at least 1 key/val pair",
|
||||
})
|
||||
}
|
||||
case "constant":
|
||||
if len(v.ConstValues) == 0 {
|
||||
failures = append(failures, validationErr{
|
||||
Field: fieldValues,
|
||||
Msg: "constant variable must have a least 1 value provided",
|
||||
})
|
||||
}
|
||||
case "query":
|
||||
if v.Query == "" {
|
||||
failures = append(failures, validationErr{
|
||||
Field: fieldQuery,
|
||||
Msg: "query variable must provide a query string",
|
||||
})
|
||||
}
|
||||
if v.Language != "influxql" && v.Language != "flux" {
|
||||
failures = append(failures, validationErr{
|
||||
Field: fieldLanguage,
|
||||
Msg: fmt.Sprintf(`query variable language must be either "influxql" or "flux"; got %q`, v.Language),
|
||||
})
|
||||
}
|
||||
}
|
||||
if len(failures) > 0 {
|
||||
return []validationErr{
|
||||
objectValidationErr(fieldSpec, failures...),
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
156
pkger/service.go
156
pkger/service.go
|
@ -679,12 +679,12 @@ func (s *Service) dryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg, opts
|
|||
s.dryRunBuckets(ctx, orgID, state.mBuckets)
|
||||
s.dryRunChecks(ctx, orgID, state.mChecks)
|
||||
s.dryRunLabels(ctx, orgID, state.mLabels)
|
||||
s.dryRunVariables(ctx, orgID, state.mVariables)
|
||||
|
||||
var diff Diff
|
||||
diff.Dashboards = s.dryRunDashboards(pkg)
|
||||
diff.Tasks = s.dryRunTasks(pkg)
|
||||
diff.Telegrafs = s.dryRunTelegraf(pkg)
|
||||
diff.Variables = s.dryRunVariables(ctx, orgID, pkg)
|
||||
|
||||
diffEndpoints, err := s.dryRunNotificationEndpoints(ctx, orgID, pkg)
|
||||
if err != nil {
|
||||
|
@ -715,6 +715,7 @@ func (s *Service) dryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg, opts
|
|||
diff.Buckets = stateDiff.Buckets
|
||||
diff.Checks = stateDiff.Checks
|
||||
diff.Labels = stateDiff.Labels
|
||||
diff.Variables = stateDiff.Variables
|
||||
diff.LabelMappings = append(stateDiff.LabelMappings, diff.LabelMappings...)
|
||||
|
||||
return pkg.Summary(), diff, state, parseErr
|
||||
|
@ -924,10 +925,7 @@ func (s *Service) dryRunTelegraf(pkg *Pkg) []DiffTelegraf {
|
|||
return diffs
|
||||
}
|
||||
|
||||
func (s *Service) dryRunVariables(ctx context.Context, orgID influxdb.ID, pkg *Pkg) []DiffVariable {
|
||||
mExistingLabels := make(map[string]DiffVariable)
|
||||
variables := pkg.variables()
|
||||
|
||||
func (s *Service) dryRunVariables(ctx context.Context, orgID influxdb.ID, vars map[string]*stateVariable) {
|
||||
existingVars, _ := s.getAllPlatformVariables(ctx, orgID)
|
||||
|
||||
mIDs := make(map[influxdb.ID]*influxdb.Variable)
|
||||
|
@ -937,29 +935,18 @@ func (s *Service) dryRunVariables(ctx context.Context, orgID influxdb.ID, pkg *P
|
|||
mNames[v.Name] = v
|
||||
}
|
||||
|
||||
for i := range variables {
|
||||
pkgVar := variables[i]
|
||||
|
||||
for _, v := range vars {
|
||||
var existing *influxdb.Variable
|
||||
if pkgVar.ID() != 0 {
|
||||
existing = mIDs[pkgVar.ID()]
|
||||
if v.ID() != 0 {
|
||||
existing = mIDs[v.ID()]
|
||||
} else {
|
||||
existing = mNames[pkgVar.Name()]
|
||||
existing = mNames[v.parserVar.Name()]
|
||||
}
|
||||
pkgVar.existing = existing
|
||||
|
||||
mExistingLabels[pkgVar.Name()] = newDiffVariable(pkgVar, existing)
|
||||
if IsNew(v.stateStatus) && existing != nil {
|
||||
v.stateStatus = StateStatusExists
|
||||
}
|
||||
v.existing = existing
|
||||
}
|
||||
|
||||
diffs := make([]DiffVariable, 0, len(mExistingLabels))
|
||||
for _, diff := range mExistingLabels {
|
||||
diffs = append(diffs, diff)
|
||||
}
|
||||
sort.Slice(diffs, func(i, j int) bool {
|
||||
return diffs[i].PkgName < diffs[j].PkgName
|
||||
})
|
||||
|
||||
return diffs
|
||||
}
|
||||
|
||||
type (
|
||||
|
@ -987,7 +974,6 @@ func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg, state *stat
|
|||
mapperNotificationRules(pkg.notificationRules()),
|
||||
mapperTasks(pkg.tasks()),
|
||||
mapperTelegrafs(pkg.telegrafs()),
|
||||
mapperVariables(pkg.variables()),
|
||||
}
|
||||
|
||||
diffs := make([]DiffLabelMapping, 0)
|
||||
|
@ -1113,6 +1099,17 @@ func (s *Service) dryRunLabelMappingsV2(ctx context.Context, state *stateCoordin
|
|||
mappings = append(mappings, mm...)
|
||||
}
|
||||
|
||||
for _, v := range state.mVariables {
|
||||
if IsRemoval(v.stateStatus) {
|
||||
continue
|
||||
}
|
||||
mm, err := s.dryRunResourceLabelMappingV2(ctx, state, stateLabelsByResName, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mappings = append(mappings, mm...)
|
||||
}
|
||||
|
||||
return mappings, nil
|
||||
}
|
||||
|
||||
|
@ -1182,6 +1179,13 @@ func (s *Service) addStackState(ctx context.Context, stackID influxdb.ID, pkg *P
|
|||
addObjectForRemoval(kind Kind, pkgName string, id influxdb.ID)
|
||||
}
|
||||
|
||||
stateKinds := []Kind{
|
||||
KindBucket,
|
||||
KindCheck,
|
||||
KindLabel,
|
||||
KindVariable,
|
||||
}
|
||||
|
||||
// check resource exists in pkg
|
||||
// if exists
|
||||
// set id on existing pkg resource
|
||||
|
@ -1189,7 +1193,7 @@ func (s *Service) addStackState(ctx context.Context, stackID influxdb.ID, pkg *P
|
|||
// add stub pkg resource that indicates it should be deleted
|
||||
for _, r := range stack.Resources {
|
||||
var mapper stateMapper = pkg
|
||||
if r.Kind.is(KindBucket, KindLabel, KindCheck) {
|
||||
if r.Kind.is(stateKinds...) {
|
||||
// hack for time being while we transition state out of pkg.
|
||||
// this will take several passes to finish up.
|
||||
mapper = state
|
||||
|
@ -1312,7 +1316,7 @@ func (s *Service) apply(ctx context.Context, coordinator *rollbackCoordinator, o
|
|||
},
|
||||
{
|
||||
// primary resources, can have relationships to labels
|
||||
s.applyVariables(ctx, pkg.variables()),
|
||||
s.applyVariables(ctx, state.variables()),
|
||||
s.applyBuckets(ctx, state.buckets()),
|
||||
s.applyChecks(ctx, state.checks()),
|
||||
s.applyDashboards(pkg.dashboards()),
|
||||
|
@ -1356,12 +1360,14 @@ func (s *Service) apply(ctx context.Context, coordinator *rollbackCoordinator, o
|
|||
pkgSum.Buckets = stateSum.Buckets
|
||||
pkgSum.Checks = stateSum.Checks
|
||||
pkgSum.Labels = stateSum.Labels
|
||||
pkgSum.Variables = stateSum.Variables
|
||||
|
||||
// filter out label mappings that are from pgk and replace with those
|
||||
// in state. This is temporary hack to provide a bridge to the promise land...
|
||||
resourcesToSkip := map[influxdb.ResourceType]bool{
|
||||
influxdb.BucketsResourceType: true,
|
||||
influxdb.ChecksResourceType: true,
|
||||
influxdb.BucketsResourceType: true,
|
||||
influxdb.ChecksResourceType: true,
|
||||
influxdb.VariablesResourceType: true,
|
||||
}
|
||||
for _, lm := range pkgSum.LabelMappings {
|
||||
if resourcesToSkip[lm.ResourceType] {
|
||||
|
@ -2166,17 +2172,17 @@ func (s *Service) applyTelegrafs(teles []*telegraf) applier {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Service) applyVariables(ctx context.Context, vars []*variable) applier {
|
||||
func (s *Service) applyVariables(ctx context.Context, vars []*stateVariable) applier {
|
||||
const resource = "variable"
|
||||
|
||||
mutex := new(doMutex)
|
||||
rollBackVars := make([]*variable, 0, len(vars))
|
||||
rollBackVars := make([]*stateVariable, 0, len(vars))
|
||||
|
||||
createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody {
|
||||
var v variable
|
||||
var v *stateVariable
|
||||
mutex.Do(func() {
|
||||
vars[i].OrgID = orgID
|
||||
v = *vars[i]
|
||||
vars[i].orgID = orgID
|
||||
v = vars[i]
|
||||
})
|
||||
if !v.shouldApply() {
|
||||
return nil
|
||||
|
@ -2184,7 +2190,7 @@ func (s *Service) applyVariables(ctx context.Context, vars []*variable) applier
|
|||
influxVar, err := s.applyVariable(ctx, v)
|
||||
if err != nil {
|
||||
return &applyErrBody{
|
||||
name: v.Name(),
|
||||
name: v.parserVar.Name(),
|
||||
msg: err.Error(),
|
||||
}
|
||||
}
|
||||
|
@ -2208,20 +2214,21 @@ func (s *Service) applyVariables(ctx context.Context, vars []*variable) applier
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Service) rollbackVariables(ctx context.Context, variables []*variable) error {
|
||||
rollbackFn := func(v *variable) error {
|
||||
func (s *Service) rollbackVariables(ctx context.Context, variables []*stateVariable) error {
|
||||
rollbackFn := func(v *stateVariable) error {
|
||||
var err error
|
||||
switch {
|
||||
case v.shouldRemove:
|
||||
err = s.varSVC.CreateVariable(ctx, v.existing)
|
||||
case v.existing == nil:
|
||||
err = s.varSVC.DeleteVariable(ctx, v.ID())
|
||||
default:
|
||||
switch v.stateStatus {
|
||||
case StateStatusRemove:
|
||||
err = ierrors.Wrap(s.varSVC.CreateVariable(ctx, v.existing), "rolling back removed variable")
|
||||
case StateStatusExists:
|
||||
_, err = s.varSVC.UpdateVariable(ctx, v.ID(), &influxdb.VariableUpdate{
|
||||
Name: v.Name(),
|
||||
Description: v.Description,
|
||||
Arguments: v.influxVarArgs(),
|
||||
Name: v.parserVar.Name(),
|
||||
Description: v.parserVar.Description,
|
||||
Arguments: v.parserVar.influxVarArgs(),
|
||||
})
|
||||
err = ierrors.Wrap(err, "rolling back updated variable")
|
||||
default:
|
||||
err = ierrors.Wrap(s.varSVC.DeleteVariable(ctx, v.ID()), "rolling back created variable")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -2240,38 +2247,37 @@ func (s *Service) rollbackVariables(ctx context.Context, variables []*variable)
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) applyVariable(ctx context.Context, v variable) (influxdb.Variable, error) {
|
||||
if v.shouldRemove {
|
||||
func (s *Service) applyVariable(ctx context.Context, v *stateVariable) (influxdb.Variable, error) {
|
||||
switch v.stateStatus {
|
||||
case StateStatusRemove:
|
||||
if err := s.varSVC.DeleteVariable(ctx, v.id); err != nil {
|
||||
return influxdb.Variable{}, err
|
||||
}
|
||||
return *v.existing, nil
|
||||
}
|
||||
|
||||
if v.existing != nil {
|
||||
case StateStatusExists:
|
||||
updatedVar, err := s.varSVC.UpdateVariable(ctx, v.ID(), &influxdb.VariableUpdate{
|
||||
Name: v.Name(),
|
||||
Description: v.Description,
|
||||
Arguments: v.influxVarArgs(),
|
||||
Name: v.parserVar.Name(),
|
||||
Description: v.parserVar.Description,
|
||||
Arguments: v.parserVar.influxVarArgs(),
|
||||
})
|
||||
if err != nil {
|
||||
return influxdb.Variable{}, err
|
||||
}
|
||||
return *updatedVar, nil
|
||||
}
|
||||
default:
|
||||
influxVar := influxdb.Variable{
|
||||
OrganizationID: v.orgID,
|
||||
Name: v.parserVar.Name(),
|
||||
Description: v.parserVar.Description,
|
||||
Arguments: v.parserVar.influxVarArgs(),
|
||||
}
|
||||
err := s.varSVC.CreateVariable(ctx, &influxVar)
|
||||
if err != nil {
|
||||
return influxdb.Variable{}, err
|
||||
}
|
||||
|
||||
influxVar := influxdb.Variable{
|
||||
OrganizationID: v.OrgID,
|
||||
Name: v.Name(),
|
||||
Description: v.Description,
|
||||
Arguments: v.influxVarArgs(),
|
||||
return influxVar, nil
|
||||
}
|
||||
err := s.varSVC.CreateVariable(ctx, &influxVar)
|
||||
if err != nil {
|
||||
return influxdb.Variable{}, err
|
||||
}
|
||||
|
||||
return influxVar, nil
|
||||
}
|
||||
|
||||
func (s *Service) applyLabelMappingsV2(labelMappings []stateLabelMapping) applier {
|
||||
|
@ -2485,15 +2491,15 @@ func (s *Service) updateStackAfterSuccess(ctx context.Context, stackID influxdb.
|
|||
Name: l.parserLabel.PkgName(),
|
||||
})
|
||||
}
|
||||
for _, v := range pkg.variables() {
|
||||
if v.shouldRemove {
|
||||
for _, v := range state.mVariables {
|
||||
if IsRemoval(v.stateStatus) {
|
||||
continue
|
||||
}
|
||||
stackResources = append(stackResources, StackResource{
|
||||
APIVersion: APIVersion,
|
||||
ID: v.ID(),
|
||||
Kind: KindVariable,
|
||||
Name: v.PkgName(),
|
||||
Name: v.parserVar.PkgName(),
|
||||
})
|
||||
}
|
||||
stack.Resources = stackResources
|
||||
|
@ -2557,13 +2563,11 @@ func (s *Service) updateStackAfterRollback(ctx context.Context, stackID influxdb
|
|||
res.ID = l.existing.ID
|
||||
}
|
||||
}
|
||||
for _, v := range pkg.variables() {
|
||||
if v.shouldRemove {
|
||||
res := existingResources[newKey(KindVariable, v.PkgName())]
|
||||
if res.ID != v.ID() {
|
||||
hasChanges = true
|
||||
res.ID = v.existing.ID
|
||||
}
|
||||
for _, v := range state.mVariables {
|
||||
res, ok := existingResources[newKey(KindVariable, v.parserVar.PkgName())]
|
||||
if ok && res.ID != v.ID() {
|
||||
hasChanges = true
|
||||
res.ID = v.existing.ID
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,24 +1,27 @@
|
|||
package pkger
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sort"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
)
|
||||
|
||||
type stateCoordinator struct {
|
||||
mBuckets map[string]*stateBucket
|
||||
mChecks map[string]*stateCheck
|
||||
mLabels map[string]*stateLabel
|
||||
mBuckets map[string]*stateBucket
|
||||
mChecks map[string]*stateCheck
|
||||
mLabels map[string]*stateLabel
|
||||
mVariables map[string]*stateVariable
|
||||
|
||||
labelMappings []stateLabelMapping
|
||||
}
|
||||
|
||||
func newStateCoordinator(pkg *Pkg) *stateCoordinator {
|
||||
state := stateCoordinator{
|
||||
mBuckets: make(map[string]*stateBucket),
|
||||
mChecks: make(map[string]*stateCheck),
|
||||
mLabels: make(map[string]*stateLabel),
|
||||
mBuckets: make(map[string]*stateBucket),
|
||||
mChecks: make(map[string]*stateCheck),
|
||||
mLabels: make(map[string]*stateLabel),
|
||||
mVariables: make(map[string]*stateVariable),
|
||||
}
|
||||
|
||||
for _, pkgBkt := range pkg.buckets() {
|
||||
|
@ -39,6 +42,12 @@ func newStateCoordinator(pkg *Pkg) *stateCoordinator {
|
|||
stateStatus: StateStatusNew,
|
||||
}
|
||||
}
|
||||
for _, pkgVar := range pkg.variables() {
|
||||
state.mVariables[pkgVar.PkgName()] = &stateVariable{
|
||||
parserVar: pkgVar,
|
||||
stateStatus: StateStatusNew,
|
||||
}
|
||||
}
|
||||
|
||||
return &state
|
||||
}
|
||||
|
@ -67,6 +76,14 @@ func (s *stateCoordinator) labels() []*stateLabel {
|
|||
return out
|
||||
}
|
||||
|
||||
func (s *stateCoordinator) variables() []*stateVariable {
|
||||
out := make([]*stateVariable, 0, len(s.mVariables))
|
||||
for _, v := range s.mVariables {
|
||||
out = append(out, v)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (s *stateCoordinator) diff() Diff {
|
||||
var diff Diff
|
||||
for _, b := range s.mBuckets {
|
||||
|
@ -90,6 +107,13 @@ func (s *stateCoordinator) diff() Diff {
|
|||
return diff.Labels[i].PkgName < diff.Labels[j].PkgName
|
||||
})
|
||||
|
||||
for _, v := range s.mVariables {
|
||||
diff.Variables = append(diff.Variables, v.diffVariable())
|
||||
}
|
||||
sort.Slice(diff.Variables, func(i, j int) bool {
|
||||
return diff.Variables[i].PkgName < diff.Variables[j].PkgName
|
||||
})
|
||||
|
||||
for _, m := range s.labelMappings {
|
||||
diff.LabelMappings = append(diff.LabelMappings, m.diffLabelMapping())
|
||||
}
|
||||
|
@ -145,6 +169,16 @@ func (s *stateCoordinator) summary() Summary {
|
|||
return sum.Labels[i].PkgName < sum.Labels[j].PkgName
|
||||
})
|
||||
|
||||
for _, v := range s.mVariables {
|
||||
if IsRemoval(v.stateStatus) {
|
||||
continue
|
||||
}
|
||||
sum.Variables = append(sum.Variables, v.summarize())
|
||||
}
|
||||
sort.Slice(sum.Variables, func(i, j int) bool {
|
||||
return sum.Variables[i].PkgName < sum.Variables[j].PkgName
|
||||
})
|
||||
|
||||
for _, v := range s.labelMappings {
|
||||
sum.LabelMappings = append(sum.LabelMappings, v.summarize())
|
||||
}
|
||||
|
@ -208,6 +242,12 @@ func (s *stateCoordinator) addObjectForRemoval(k Kind, pkgName string, id influx
|
|||
parserLabel: &label{identity: newIdentity},
|
||||
stateStatus: StateStatusRemove,
|
||||
}
|
||||
case KindVariable:
|
||||
s.mVariables[pkgName] = &stateVariable{
|
||||
id: id,
|
||||
parserVar: &variable{identity: newIdentity},
|
||||
stateStatus: StateStatusRemove,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -231,6 +271,12 @@ func (s *stateCoordinator) getObjectIDSetter(k Kind, pkgName string) (func(influ
|
|||
r.id = id
|
||||
r.stateStatus = StateStatusExists
|
||||
}, ok
|
||||
case KindVariable:
|
||||
r, ok := s.mVariables[pkgName]
|
||||
return func(id influxdb.ID) {
|
||||
r.id = id
|
||||
r.stateStatus = StateStatusExists
|
||||
}, ok
|
||||
default:
|
||||
return nil, false
|
||||
}
|
||||
|
@ -245,7 +291,7 @@ type stateIdentity struct {
|
|||
}
|
||||
|
||||
func (s stateIdentity) exists() bool {
|
||||
return s.id != 0
|
||||
return IsExisting(s.stateStatus)
|
||||
}
|
||||
|
||||
type stateBucket struct {
|
||||
|
@ -503,16 +549,92 @@ func stateLabelMappingToInfluxLabelMapping(mapping stateLabelMapping) influxdb.L
|
|||
}
|
||||
}
|
||||
|
||||
type stateVariable struct {
|
||||
id, orgID influxdb.ID
|
||||
stateStatus StateStatus
|
||||
|
||||
parserVar *variable
|
||||
existing *influxdb.Variable
|
||||
}
|
||||
|
||||
func (v *stateVariable) ID() influxdb.ID {
|
||||
if !IsNew(v.stateStatus) && v.existing != nil {
|
||||
return v.existing.ID
|
||||
}
|
||||
return v.id
|
||||
}
|
||||
|
||||
func (v *stateVariable) diffVariable() DiffVariable {
|
||||
diff := DiffVariable{
|
||||
DiffIdentifier: DiffIdentifier{
|
||||
ID: SafeID(v.ID()),
|
||||
Remove: IsRemoval(v.stateStatus),
|
||||
StateStatus: v.stateStatus,
|
||||
PkgName: v.parserVar.PkgName(),
|
||||
},
|
||||
New: DiffVariableValues{
|
||||
Name: v.parserVar.Name(),
|
||||
Description: v.parserVar.Description,
|
||||
Args: v.parserVar.influxVarArgs(),
|
||||
},
|
||||
}
|
||||
if iv := v.existing; iv != nil {
|
||||
diff.Old = &DiffVariableValues{
|
||||
Name: iv.Name,
|
||||
Description: iv.Description,
|
||||
Args: iv.Arguments,
|
||||
}
|
||||
}
|
||||
|
||||
return diff
|
||||
}
|
||||
|
||||
func (v *stateVariable) labels() []*label {
|
||||
return v.parserVar.labels
|
||||
}
|
||||
|
||||
func (v *stateVariable) resourceType() influxdb.ResourceType {
|
||||
return KindVariable.ResourceType()
|
||||
}
|
||||
|
||||
func (v *stateVariable) shouldApply() bool {
|
||||
return IsRemoval(v.stateStatus) ||
|
||||
v.existing == nil ||
|
||||
v.existing.Description != v.parserVar.Description ||
|
||||
v.existing.Arguments == nil ||
|
||||
!reflect.DeepEqual(v.existing.Arguments, v.parserVar.influxVarArgs())
|
||||
}
|
||||
|
||||
func (v *stateVariable) stateIdentity() stateIdentity {
|
||||
return stateIdentity{
|
||||
id: v.ID(),
|
||||
name: v.parserVar.Name(),
|
||||
pkgName: v.parserVar.PkgName(),
|
||||
resourceType: v.resourceType(),
|
||||
stateStatus: v.stateStatus,
|
||||
}
|
||||
}
|
||||
|
||||
func (v *stateVariable) summarize() SummaryVariable {
|
||||
sum := v.parserVar.summarize()
|
||||
sum.ID = SafeID(v.ID())
|
||||
sum.OrgID = SafeID(v.orgID)
|
||||
return sum
|
||||
}
|
||||
|
||||
// IsNew identifies state status as new to the platform.
|
||||
func IsNew(status StateStatus) bool {
|
||||
// defaulting zero value to identify as new
|
||||
return status == StateStatusNew || status == ""
|
||||
}
|
||||
|
||||
// IsExisting identifies state status as existing in the platform.
|
||||
func IsExisting(status StateStatus) bool {
|
||||
return status == StateStatusExists
|
||||
}
|
||||
|
||||
// IsRemoval identifies state status as existing resource that will be removed
|
||||
// from the platform.
|
||||
func IsRemoval(status StateStatus) bool {
|
||||
return status == StateStatusRemove
|
||||
}
|
||||
|
|
|
@ -420,8 +420,9 @@ func TestService(t *testing.T) {
|
|||
|
||||
expected := DiffVariable{
|
||||
DiffIdentifier: DiffIdentifier{
|
||||
ID: 1,
|
||||
PkgName: "var_const_3",
|
||||
ID: 1,
|
||||
PkgName: "var_const_3",
|
||||
StateStatus: StateStatusExists,
|
||||
},
|
||||
Old: &DiffVariableValues{
|
||||
Name: "var_const_3",
|
||||
|
@ -441,7 +442,8 @@ func TestService(t *testing.T) {
|
|||
expected = DiffVariable{
|
||||
DiffIdentifier: DiffIdentifier{
|
||||
// no ID here since this one would be new
|
||||
PkgName: "var_map_4",
|
||||
PkgName: "var_map_4",
|
||||
StateStatus: StateStatusNew,
|
||||
},
|
||||
New: DiffVariableValues{
|
||||
Name: "var_map_4",
|
||||
|
@ -1178,21 +1180,23 @@ func TestService(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("maps variables with labels", func(t *testing.T) {
|
||||
testLabelMappingFn(
|
||||
t,
|
||||
"testdata/variable_associates_label.yml",
|
||||
1,
|
||||
func() []ServiceSetterFn {
|
||||
fakeVarSVC := mock.NewVariableService()
|
||||
fakeVarSVC.CreateVariableF = func(_ context.Context, v *influxdb.Variable) error {
|
||||
v.ID = influxdb.ID(rand.Int())
|
||||
return nil
|
||||
}
|
||||
return []ServiceSetterFn{WithVariableSVC(fakeVarSVC)}
|
||||
},
|
||||
)
|
||||
})
|
||||
opt := func() []ServiceSetterFn {
|
||||
fakeVarSVC := mock.NewVariableService()
|
||||
fakeVarSVC.CreateVariableF = func(_ context.Context, v *influxdb.Variable) error {
|
||||
v.ID = influxdb.ID(rand.Int())
|
||||
return nil
|
||||
}
|
||||
return []ServiceSetterFn{WithVariableSVC(fakeVarSVC)}
|
||||
}
|
||||
|
||||
t.Run("applies successfully", func(t *testing.T) {
|
||||
testLabelMappingV2ApplyFn(t, "testdata/variable_associates_label.yml", 1, opt)
|
||||
})
|
||||
|
||||
t.Run("deletes new label mappings on error", func(t *testing.T) {
|
||||
testLabelMappingV2RollbackFn(t, "testdata/variable_associates_label.yml", 0, opt)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("notification endpoints", func(t *testing.T) {
|
||||
|
@ -1488,7 +1492,8 @@ func TestService(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, sum.Variables, 4)
|
||||
expected := sum.Variables[1]
|
||||
|
||||
expected := sum.Variables[0]
|
||||
assert.True(t, expected.ID > 0 && expected.ID < 5)
|
||||
assert.Equal(t, SafeID(orgID), expected.OrgID)
|
||||
assert.Equal(t, "var_const_3", expected.Name)
|
||||
|
@ -1562,7 +1567,7 @@ func TestService(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, sum.Variables, 4)
|
||||
expected := sum.Variables[1]
|
||||
expected := sum.Variables[0]
|
||||
assert.Equal(t, SafeID(1), expected.ID)
|
||||
assert.Equal(t, "var_const_3", expected.Name)
|
||||
|
||||
|
|
Loading…
Reference in New Issue