chore(pkger): refactor pkger data model to separate parsed and stateful datum

pull/17737/head
Johnny Steenbergen 2020-04-10 21:51:13 -07:00 committed by Johnny Steenbergen
parent 4d2e06f998
commit c718b676ef
20 changed files with 1640 additions and 992 deletions

View File

@ -187,7 +187,7 @@ func (b *cmdPkgBuilder) pkgApplyRunEFn(cmd *cobra.Command, args []string) error
opts = append(opts, pkger.ApplyWithSecrets(providedSecrets))
summary, err := svc.Apply(context.Background(), influxOrgID, 0, pkg, opts...)
summary, _, err := svc.Apply(context.Background(), influxOrgID, 0, pkg, opts...)
if err != nil {
return err
}
@ -973,18 +973,18 @@ func (b *cmdPkgBuilder) printPkgDiff(diff pkger.Diff) error {
Title("Label Associations").
SetHeaders(
"Resource Type",
"Resource Name", "Resource ID",
"Label Name", "Label ID",
"Resource Package Name", "Resource Name", "Resource ID",
"Label Package Name", "Label Name", "Label ID",
)
for _, m := range diff.LabelMappings {
newRow := []string{
string(m.ResType),
m.ResName, m.ResID.String(),
m.LabelName, m.LabelID.String(),
m.ResPkgName, m.ResName, m.ResID.String(),
m.LabelPkgName, m.LabelName, m.LabelID.String(),
}
oldRow := newRow
if m.IsNew {
if pkger.IsNew(m.StateStatus) {
oldRow = nil
}
printer.AppendDiff(oldRow, newRow)

View File

@ -705,7 +705,7 @@ type fakePkgSVC struct {
initStackFn func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error)
createFn func(ctx context.Context, setters ...pkger.CreatePkgSetFn) (*pkger.Pkg, error)
dryRunFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, pkger.Diff, error)
applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, error)
applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error)
}
func (f *fakePkgSVC) InitStack(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) {
@ -729,7 +729,7 @@ func (f *fakePkgSVC) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg
panic("not implemented")
}
func (f *fakePkgSVC) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, error) {
func (f *fakePkgSVC) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) {
if f.applyFn != nil {
return f.applyFn(ctx, orgID, userID, pkg, opts...)
}

View File

@ -24,6 +24,7 @@ func TestLauncher_Pkger(t *testing.T) {
l := RunTestLauncherOrFail(t, ctx)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx)
require.NoError(t, l.BucketService(t).DeleteBucket(ctx, l.Bucket.ID))
svc := l.PkgerService(t)
@ -151,9 +152,8 @@ func TestLauncher_Pkger(t *testing.T) {
)
var initialSum pkger.Summary
t.Log("apply pkg with stack id")
{
sum, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, initialPkg, applyOpt)
t.Run("apply pkg with stack id", func(t *testing.T) {
sum, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, initialPkg, applyOpt)
require.NoError(t, err)
initialSum = sum
@ -198,7 +198,7 @@ func TestLauncher_Pkger(t *testing.T) {
actualVar := resourceCheck.mustGetVariable(t, byName("var char"))
assert.Equal(t, sum.Variables[0].ID, pkger.SafeID(actualVar.ID))
}
}
})
var (
updateBucketName = "new bucket"
@ -207,8 +207,7 @@ func TestLauncher_Pkger(t *testing.T) {
updateLabelName = "new label"
updateVariableName = "new variable"
)
t.Log("apply pkg with stack id where resources change")
{
t.Run("apply pkg with stack id where resources change", func(t *testing.T) {
updatedPkg := newPkg(
newBucketObject(initialBucketPkgName, updateBucketName, ""),
newCheckDeadmanObject(t, initialCheckPkgName, updateCheckName, time.Hour),
@ -216,7 +215,7 @@ func TestLauncher_Pkger(t *testing.T) {
newLabelObject(initialLabelPkgName, updateLabelName, "", ""),
newVariableObject(initialVariablePkgName, updateVariableName, ""),
)
sum, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, updatedPkg, applyOpt)
sum, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, updatedPkg, applyOpt)
require.NoError(t, err)
require.Len(t, sum.Buckets, 1)
@ -257,10 +256,9 @@ func TestLauncher_Pkger(t *testing.T) {
actualVar := resourceCheck.mustGetVariable(t, byName(updateVariableName))
assert.Equal(t, sum.Variables[0].ID, pkger.SafeID(actualVar.ID))
}
}
})
t.Log("an error during application roles back resources to previous state")
{
t.Run("an error during application roles back resources to previous state", func(t *testing.T) {
logger := l.log.With(zap.String("service", "pkger"))
var svc pkger.SVC = pkger.NewService(
pkger.WithLogger(logger),
@ -288,9 +286,27 @@ func TestLauncher_Pkger(t *testing.T) {
newEndpointHTTP("z_endpoint_rolls_back", "", ""),
newVariableObject("z_var_rolls_back", "", ""),
)
_, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, pkgWithDelete, applyOpt)
_, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, pkgWithDelete, applyOpt)
require.Error(t, err)
t.Log("\tvalidate all resources are rolled back")
{
actualBkt := resourceCheck.mustGetBucket(t, byName(updateBucketName))
assert.NotEqual(t, initialSum.Buckets[0].ID, pkger.SafeID(actualBkt.ID))
actualCheck := resourceCheck.mustGetCheck(t, byName(updateCheckName))
assert.NotEqual(t, initialSum.Checks[0].Check.GetID(), actualCheck.GetID())
actualEndpoint := resourceCheck.mustGetEndpoint(t, byName(updateEndpointName))
assert.NotEqual(t, initialSum.NotificationEndpoints[0].NotificationEndpoint.GetID(), actualEndpoint.GetID())
actualLabel := resourceCheck.mustGetLabel(t, byName(updateLabelName))
assert.NotEqual(t, initialSum.Labels[0].ID, pkger.SafeID(actualLabel.ID))
actualVariable := resourceCheck.mustGetVariable(t, byName(updateVariableName))
assert.NotEqual(t, initialSum.Variables[0].ID, pkger.SafeID(actualVariable.ID))
}
t.Log("\tvalidate all changes do not persist")
{
for _, name := range []string{"z_roll_me_back", "z_rolls_back_too"} {
@ -310,28 +326,9 @@ func TestLauncher_Pkger(t *testing.T) {
_, err = resourceCheck.getVariable(t, byName("z_var_rolls_back"))
require.Error(t, err)
}
})
t.Log("\tvalidate all resources are rolled back")
{
actualBkt := resourceCheck.mustGetBucket(t, byName(updateBucketName))
assert.NotEqual(t, initialSum.Buckets[0].ID, pkger.SafeID(actualBkt.ID))
actualCheck := resourceCheck.mustGetCheck(t, byName(updateCheckName))
assert.NotEqual(t, initialSum.Checks[0].Check.GetID(), actualCheck.GetID())
actualEndpoint := resourceCheck.mustGetEndpoint(t, byName(updateEndpointName))
assert.NotEqual(t, initialSum.NotificationEndpoints[0].NotificationEndpoint.GetID(), actualEndpoint.GetID())
actualLabel := resourceCheck.mustGetLabel(t, byName(updateLabelName))
assert.NotEqual(t, initialSum.Labels[0].ID, pkger.SafeID(actualLabel.ID))
actualVariable := resourceCheck.mustGetVariable(t, byName(updateVariableName))
assert.NotEqual(t, initialSum.Variables[0].ID, pkger.SafeID(actualVariable.ID))
}
}
t.Log("apply pkg with stack id where resources have been removed since last run")
{
t.Run("apply pkg with stack id where resources have been removed since last run", func(t *testing.T) {
allNewResourcesPkg := newPkg(
newBucketObject("non_existent_bucket", "", ""),
newCheckDeadmanObject(t, "non_existent_check", "", time.Minute),
@ -339,7 +336,7 @@ func TestLauncher_Pkger(t *testing.T) {
newLabelObject("non_existent_label", "", "", ""),
newVariableObject("non_existent_var", "", ""),
)
sum, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, allNewResourcesPkg, applyOpt)
sum, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, allNewResourcesPkg, applyOpt)
require.NoError(t, err)
require.Len(t, sum.Buckets, 1)
@ -413,7 +410,7 @@ func TestLauncher_Pkger(t *testing.T) {
_, err = resourceCheck.getVariable(t, byName(updateVariableName))
require.Error(t, err)
}
}
})
})
})
@ -433,7 +430,7 @@ func TestLauncher_Pkger(t *testing.T) {
pkger.WithVariableSVC(l.VariableService(t)),
)
_, err := svc.Apply(ctx, l.Org.ID, l.User.ID, newPkg(t))
_, _, err := svc.Apply(ctx, l.Org.ID, l.User.ID, newPkg(t))
require.Error(t, err)
bkts, _, err := l.BucketService(t).FindBuckets(ctx, influxdb.BucketFilter{OrganizationID: &l.Org.ID})
@ -503,17 +500,6 @@ func TestLauncher_Pkger(t *testing.T) {
}
}
hasMapping := func(t *testing.T, actuals []pkger.SummaryLabelMapping, expected pkger.SummaryLabelMapping) {
t.Helper()
for _, actual := range actuals {
if actual == expected {
return
}
}
require.FailNowf(t, "did not find expected mapping", "expected: %v", expected)
}
t.Run("dry run a package with no existing resources", func(t *testing.T) {
sum, diff, err := svc.DryRun(ctx, l.Org.ID, l.User.ID, newPkg(t))
require.NoError(t, err)
@ -639,137 +625,114 @@ spec:
t.Run("apply a package of all new resources", func(t *testing.T) {
// this initial test is also setup for the sub tests
sum1, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, newPkg(t))
sum1, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, newPkg(t))
require.NoError(t, err)
verifyCompleteSummary := func(t *testing.T, sum1 pkger.Summary, exportAllSum bool) {
t.Helper()
labels := sum1.Labels
require.Len(t, labels, 2)
assert.NotZero(t, labels[0].ID)
assert.Equal(t, "label_1", labels[0].Name)
assert.Equal(t, "the 2nd label", labels[1].Name)
labels := sum1.Labels
require.Len(t, labels, 2)
if !exportAllSum {
assert.NotZero(t, labels[0].ID)
}
assert.Equal(t, "label_1", labels[0].Name)
assert.Equal(t, "the 2nd label", labels[1].Name)
bkts := sum1.Buckets
require.Len(t, bkts, 1)
assert.NotZero(t, bkts[0].ID)
assert.NotEmpty(t, bkts[0].PkgName)
assert.Equal(t, "rucketeer", bkts[0].Name)
hasLabelAssociations(t, bkts[0].LabelAssociations, 2, "label_1", "the 2nd label")
bkts := sum1.Buckets
if exportAllSum {
require.Len(t, bkts, 2)
assert.Equal(t, l.Bucket.Name, bkts[0].Name)
bkts = bkts[1:]
}
require.Len(t, bkts, 1)
if !exportAllSum {
assert.NotZero(t, bkts[0].ID)
}
assert.Equal(t, "rucketeer", bkts[0].Name)
hasLabelAssociations(t, bkts[0].LabelAssociations, 2, "label_1", "the 2nd label")
checks := sum1.Checks
require.Len(t, checks, 2)
assert.Equal(t, "check 0 name", checks[0].Check.GetName())
hasLabelAssociations(t, checks[0].LabelAssociations, 1, "label_1")
assert.Equal(t, "check_1", checks[1].Check.GetName())
hasLabelAssociations(t, checks[1].LabelAssociations, 1, "label_1")
for _, ch := range checks {
if !exportAllSum {
assert.NotZero(t, ch.Check.GetID())
}
}
dashs := sum1.Dashboards
require.Len(t, dashs, 1)
if !exportAllSum {
assert.NotZero(t, dashs[0].ID)
}
assert.Equal(t, "dash_1", dashs[0].Name)
assert.Equal(t, "desc1", dashs[0].Description)
hasLabelAssociations(t, dashs[0].LabelAssociations, 2, "label_1", "the 2nd label")
require.Len(t, dashs[0].Charts, 1)
assert.Equal(t, influxdb.ViewPropertyTypeSingleStat, dashs[0].Charts[0].Properties.GetType())
endpoints := sum1.NotificationEndpoints
require.Len(t, endpoints, 1)
if !exportAllSum {
assert.NotZero(t, endpoints[0].NotificationEndpoint.GetID())
}
assert.Equal(t, "no auth endpoint", endpoints[0].NotificationEndpoint.GetName())
assert.Equal(t, "http none auth desc", endpoints[0].NotificationEndpoint.GetDescription())
assert.Equal(t, influxdb.TaskStatusInactive, string(endpoints[0].NotificationEndpoint.GetStatus()))
hasLabelAssociations(t, endpoints[0].LabelAssociations, 1, "label_1")
require.Len(t, sum1.NotificationRules, 1)
rule := sum1.NotificationRules[0]
if !exportAllSum {
assert.NotZero(t, rule.ID)
}
assert.Equal(t, "rule_0", rule.Name)
assert.Equal(t, pkger.SafeID(endpoints[0].NotificationEndpoint.GetID()), rule.EndpointID)
if !exportAllSum {
assert.Equal(t, "http_none_auth_notification_endpoint", rule.EndpointName)
assert.Equalf(t, "http", rule.EndpointType, "rule: %+v", rule)
} else {
assert.NotEmpty(t, rule.EndpointName)
}
require.Len(t, sum1.Tasks, 1)
task := sum1.Tasks[0]
if !exportAllSum {
assert.NotZero(t, task.ID)
}
assert.Equal(t, "task_1", task.Name)
assert.Equal(t, "desc_1", task.Description)
teles := sum1.TelegrafConfigs
require.Len(t, teles, 1)
if !exportAllSum {
assert.NotZero(t, teles[0].TelegrafConfig.ID)
assert.Equal(t, l.Org.ID, teles[0].TelegrafConfig.OrgID)
}
assert.Equal(t, "first tele config", teles[0].TelegrafConfig.Name)
assert.Equal(t, "desc", teles[0].TelegrafConfig.Description)
assert.Equal(t, telConf, teles[0].TelegrafConfig.Config)
vars := sum1.Variables
require.Len(t, vars, 1)
if !exportAllSum {
assert.NotZero(t, vars[0].ID)
}
assert.Equal(t, "query var", vars[0].Name)
hasLabelAssociations(t, vars[0].LabelAssociations, 1, "label_1")
varArgs := vars[0].Arguments
require.NotNil(t, varArgs)
assert.Equal(t, "query", varArgs.Type)
assert.Equal(t, influxdb.VariableQueryValues{
Query: "buckets() |> filter(fn: (r) => r.name !~ /^_/) |> rename(columns: {name: \"_value\"}) |> keep(columns: [\"_value\"])",
Language: "flux",
}, varArgs.Values)
newSumMapping := func(id pkger.SafeID, name string, rt influxdb.ResourceType) pkger.SummaryLabelMapping {
return pkger.SummaryLabelMapping{
ResourceName: name,
LabelName: labels[0].Name,
LabelID: labels[0].ID,
ResourceID: id,
ResourceType: rt,
}
}
mappings := sum1.LabelMappings
require.Len(t, mappings, 11)
hasMapping(t, mappings, newSumMapping(bkts[0].ID, bkts[0].Name, influxdb.BucketsResourceType))
hasMapping(t, mappings, newSumMapping(pkger.SafeID(checks[0].Check.GetID()), checks[0].Check.GetName(), influxdb.ChecksResourceType))
hasMapping(t, mappings, newSumMapping(pkger.SafeID(checks[1].Check.GetID()), checks[1].Check.GetName(), influxdb.ChecksResourceType))
hasMapping(t, mappings, newSumMapping(dashs[0].ID, dashs[0].Name, influxdb.DashboardsResourceType))
hasMapping(t, mappings, newSumMapping(pkger.SafeID(endpoints[0].NotificationEndpoint.GetID()), endpoints[0].NotificationEndpoint.GetName(), influxdb.NotificationEndpointResourceType))
hasMapping(t, mappings, newSumMapping(rule.ID, rule.Name, influxdb.NotificationRuleResourceType))
hasMapping(t, mappings, newSumMapping(task.ID, task.Name, influxdb.TasksResourceType))
hasMapping(t, mappings, newSumMapping(pkger.SafeID(teles[0].TelegrafConfig.ID), teles[0].TelegrafConfig.Name, influxdb.TelegrafsResourceType))
hasMapping(t, mappings, newSumMapping(vars[0].ID, vars[0].Name, influxdb.VariablesResourceType))
checks := sum1.Checks
require.Len(t, checks, 2)
assert.Equal(t, "check 0 name", checks[0].Check.GetName())
hasLabelAssociations(t, checks[0].LabelAssociations, 1, "label_1")
assert.Equal(t, "check_1", checks[1].Check.GetName())
hasLabelAssociations(t, checks[1].LabelAssociations, 1, "label_1")
for _, ch := range checks {
assert.NotZero(t, ch.Check.GetID())
}
verifyCompleteSummary(t, sum1, false)
dashs := sum1.Dashboards
require.Len(t, dashs, 1)
assert.NotZero(t, dashs[0].ID)
assert.NotEmpty(t, dashs[0].Name)
assert.Equal(t, "dash_1", dashs[0].Name)
assert.Equal(t, "desc1", dashs[0].Description)
hasLabelAssociations(t, dashs[0].LabelAssociations, 2, "label_1", "the 2nd label")
require.Len(t, dashs[0].Charts, 1)
assert.Equal(t, influxdb.ViewPropertyTypeSingleStat, dashs[0].Charts[0].Properties.GetType())
endpoints := sum1.NotificationEndpoints
require.Len(t, endpoints, 1)
assert.NotZero(t, endpoints[0].NotificationEndpoint.GetID())
assert.Equal(t, "no auth endpoint", endpoints[0].NotificationEndpoint.GetName())
assert.Equal(t, "http none auth desc", endpoints[0].NotificationEndpoint.GetDescription())
assert.Equal(t, influxdb.TaskStatusInactive, string(endpoints[0].NotificationEndpoint.GetStatus()))
hasLabelAssociations(t, endpoints[0].LabelAssociations, 1, "label_1")
require.Len(t, sum1.NotificationRules, 1)
rule := sum1.NotificationRules[0]
assert.NotZero(t, rule.ID)
assert.Equal(t, "rule_0", rule.Name)
assert.Equal(t, pkger.SafeID(endpoints[0].NotificationEndpoint.GetID()), rule.EndpointID)
assert.Equal(t, "http_none_auth_notification_endpoint", rule.EndpointName)
assert.Equalf(t, "http", rule.EndpointType, "rule: %+v", rule)
require.Len(t, sum1.Tasks, 1)
task := sum1.Tasks[0]
assert.NotZero(t, task.ID)
assert.Equal(t, "task_1", task.Name)
assert.Equal(t, "desc_1", task.Description)
teles := sum1.TelegrafConfigs
require.Len(t, teles, 1)
assert.NotZero(t, teles[0].TelegrafConfig.ID)
assert.Equal(t, l.Org.ID, teles[0].TelegrafConfig.OrgID)
assert.Equal(t, "first tele config", teles[0].TelegrafConfig.Name)
assert.Equal(t, "desc", teles[0].TelegrafConfig.Description)
assert.Equal(t, telConf, teles[0].TelegrafConfig.Config)
vars := sum1.Variables
require.Len(t, vars, 1)
assert.NotZero(t, vars[0].ID)
assert.Equal(t, "query var", vars[0].Name)
hasLabelAssociations(t, vars[0].LabelAssociations, 1, "label_1")
varArgs := vars[0].Arguments
require.NotNil(t, varArgs)
assert.Equal(t, "query", varArgs.Type)
assert.Equal(t, influxdb.VariableQueryValues{
Query: "buckets() |> filter(fn: (r) => r.name !~ /^_/) |> rename(columns: {name: \"_value\"}) |> keep(columns: [\"_value\"])",
Language: "flux",
}, varArgs.Values)
newSumMapping := func(id pkger.SafeID, pkgName, name string, rt influxdb.ResourceType) pkger.SummaryLabelMapping {
return pkger.SummaryLabelMapping{
Status: pkger.StateStatusNew,
ResourceID: id,
ResourceType: rt,
ResourcePkgName: pkgName,
ResourceName: name,
LabelPkgName: labels[0].PkgName,
LabelName: labels[0].Name,
LabelID: labels[0].ID,
}
}
mappings := sum1.LabelMappings
mappingsContain := func(t *testing.T, id pkger.SafeID, pkgName, name string, resourceType influxdb.ResourceType) {
t.Helper()
assert.Contains(t, mappings, newSumMapping(id, pkgName, name, resourceType))
}
require.Len(t, mappings, 11)
mappingsContain(t, bkts[0].ID, bkts[0].PkgName, bkts[0].Name, influxdb.BucketsResourceType)
mappingsContain(t, pkger.SafeID(checks[0].Check.GetID()), checks[0].PkgName, checks[0].Check.GetName(), influxdb.ChecksResourceType)
mappingsContain(t, dashs[0].ID, dashs[0].PkgName, dashs[0].Name, influxdb.DashboardsResourceType)
mappingsContain(t, pkger.SafeID(endpoints[0].NotificationEndpoint.GetID()), endpoints[0].PkgName, endpoints[0].NotificationEndpoint.GetName(), influxdb.NotificationEndpointResourceType)
mappingsContain(t, rule.ID, rule.PkgName, rule.Name, influxdb.NotificationRuleResourceType)
mappingsContain(t, task.ID, task.PkgName, task.Name, influxdb.TasksResourceType)
mappingsContain(t, pkger.SafeID(teles[0].TelegrafConfig.ID), teles[0].PkgName, teles[0].TelegrafConfig.Name, influxdb.TelegrafsResourceType)
mappingsContain(t, vars[0].ID, vars[0].PkgName, vars[0].Name, influxdb.VariablesResourceType)
var (
// used in dependent subtests
@ -793,7 +756,95 @@ spec:
))
require.NoError(t, err)
verifyCompleteSummary(t, newPkg.Summary(), true)
sum := newPkg.Summary()
labels := sum.Labels
require.Len(t, labels, 2)
assert.Equal(t, "label_1", labels[0].Name)
assert.Equal(t, "the 2nd label", labels[1].Name)
bkts := sum.Buckets
require.Len(t, bkts, 1)
assert.NotEmpty(t, bkts[0].PkgName)
assert.Equal(t, "rucketeer", bkts[0].Name)
hasLabelAssociations(t, bkts[0].LabelAssociations, 2, "label_1", "the 2nd label")
checks := sum.Checks
require.Len(t, checks, 2)
assert.Equal(t, "check 0 name", checks[0].Check.GetName())
hasLabelAssociations(t, checks[0].LabelAssociations, 1, "label_1")
assert.Equal(t, "check_1", checks[1].Check.GetName())
hasLabelAssociations(t, checks[1].LabelAssociations, 1, "label_1")
dashs := sum.Dashboards
require.Len(t, dashs, 1)
assert.NotEmpty(t, dashs[0].Name)
assert.Equal(t, "dash_1", dashs[0].Name)
assert.Equal(t, "desc1", dashs[0].Description)
hasLabelAssociations(t, dashs[0].LabelAssociations, 2, "label_1", "the 2nd label")
require.Len(t, dashs[0].Charts, 1)
assert.Equal(t, influxdb.ViewPropertyTypeSingleStat, dashs[0].Charts[0].Properties.GetType())
endpoints := sum.NotificationEndpoints
require.Len(t, endpoints, 1)
assert.Equal(t, "no auth endpoint", endpoints[0].NotificationEndpoint.GetName())
assert.Equal(t, "http none auth desc", endpoints[0].NotificationEndpoint.GetDescription())
assert.Equal(t, influxdb.TaskStatusInactive, string(endpoints[0].NotificationEndpoint.GetStatus()))
hasLabelAssociations(t, endpoints[0].LabelAssociations, 1, "label_1")
require.Len(t, sum.NotificationRules, 1)
rule := sum.NotificationRules[0]
assert.Equal(t, "rule_0", rule.Name)
assert.Equal(t, pkger.SafeID(endpoints[0].NotificationEndpoint.GetID()), rule.EndpointID)
assert.NotEmpty(t, rule.EndpointName)
require.Len(t, sum.Tasks, 1)
task := sum.Tasks[0]
assert.Equal(t, "task_1", task.Name)
assert.Equal(t, "desc_1", task.Description)
teles := sum.TelegrafConfigs
require.Len(t, teles, 1)
assert.Equal(t, "first tele config", teles[0].TelegrafConfig.Name)
assert.Equal(t, "desc", teles[0].TelegrafConfig.Description)
assert.Equal(t, telConf, teles[0].TelegrafConfig.Config)
vars := sum.Variables
require.Len(t, vars, 1)
assert.Equal(t, "query var", vars[0].Name)
hasLabelAssociations(t, vars[0].LabelAssociations, 1, "label_1")
varArgs := vars[0].Arguments
require.NotNil(t, varArgs)
assert.Equal(t, "query", varArgs.Type)
assert.Equal(t, influxdb.VariableQueryValues{
Query: "buckets() |> filter(fn: (r) => r.name !~ /^_/) |> rename(columns: {name: \"_value\"}) |> keep(columns: [\"_value\"])",
Language: "flux",
}, varArgs.Values)
newSumMapping := func(id pkger.SafeID, pkgName, name string, rt influxdb.ResourceType) pkger.SummaryLabelMapping {
return pkger.SummaryLabelMapping{
Status: pkger.StateStatusNew,
ResourceID: id,
ResourceType: rt,
ResourcePkgName: pkgName,
ResourceName: name,
LabelPkgName: labels[0].PkgName,
LabelName: labels[0].Name,
LabelID: labels[0].ID,
}
}
mappings := sum.LabelMappings
require.Len(t, mappings, 11)
assert.Contains(t, mappings, newSumMapping(bkts[0].ID, bkts[0].PkgName, bkts[0].Name, influxdb.BucketsResourceType))
//hasMapping(t, mappings, newSumMapping(pkger.SafeID(checks[0].Check.GetID()), checks[0].Check.GetName(), influxdb.ChecksResourceType))
//hasMapping(t, mappings, newSumMapping(pkger.SafeID(checks[1].Check.GetID()), checks[1].Check.GetName(), influxdb.ChecksResourceType))
//hasMapping(t, mappings, newSumMapping(dashs[0].ID, dashs[0].Name, influxdb.DashboardsResourceType))
//hasMapping(t, mappings, newSumMapping(pkger.SafeID(endpoints[0].NotificationEndpoint.GetID()), endpoints[0].NotificationEndpoint.GetName(), influxdb.NotificationEndpointResourceType))
//hasMapping(t, mappings, newSumMapping(rule.ID, rule.Name, influxdb.NotificationRuleResourceType))
//hasMapping(t, mappings, newSumMapping(task.ID, task.Name, influxdb.TasksResourceType))
//hasMapping(t, mappings, newSumMapping(pkger.SafeID(teles[0].TelegrafConfig.ID), teles[0].TelegrafConfig.Name, influxdb.TelegrafsResourceType))
//hasMapping(t, mappings, newSumMapping(vars[0].ID, vars[0].Name, influxdb.VariablesResourceType))
})
t.Run("filtered by resource types", func(t *testing.T) {
@ -883,7 +934,7 @@ spec:
t.Run("pkg with same bkt-var-label does nto create new resources for them", func(t *testing.T) {
// validate the new package doesn't create new resources for bkts/labels/vars
// since names collide.
sum2, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, newPkg(t))
sum2, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, newPkg(t))
require.NoError(t, err)
require.Equal(t, sum1.Buckets, sum2.Buckets)
@ -901,7 +952,7 @@ spec:
pkg, err := pkger.Parse(pkger.EncodingYAML, pkger.FromString(pkgStr))
require.NoError(t, err)
sum, err := svc.Apply(ctx, l.Org.ID, l.User.ID, pkg)
sum, _, err := svc.Apply(ctx, l.Org.ID, l.User.ID, pkg)
require.NoError(t, err)
return sum
}
@ -1095,7 +1146,7 @@ spec:
pkger.WithVariableSVC(l.VariableService(t)),
)
_, err = svc.Apply(ctx, l.Org.ID, 0, updatePkg)
_, _, err = svc.Apply(ctx, l.Org.ID, 0, updatePkg)
require.Error(t, err)
bkt, err := l.BucketService(t).FindBucketByID(ctx, influxdb.ID(sum1Bkts[0].ID))
@ -1175,7 +1226,7 @@ spec:
pkg, err := pkger.Parse(pkger.EncodingYAML, pkger.FromString(pkgStr))
require.NoError(t, err)
sum, err := svc.Apply(timedCtx(time.Second), l.Org.ID, l.User.ID, pkg)
sum, _, err := svc.Apply(timedCtx(time.Second), l.Org.ID, l.User.ID, pkg)
require.NoError(t, err)
require.Len(t, sum.Tasks, 1)
@ -1321,7 +1372,7 @@ spec:
}
assert.Equal(t, expectedMissingEnvs, sum.MissingEnvs)
sum, err = svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, pkg, pkger.ApplyWithEnvRefs(map[string]string{
sum, _, err = svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, pkg, pkger.ApplyWithEnvRefs(map[string]string{
"bkt-1-name-ref": "rucket_threeve",
"check-1-name-ref": "check_threeve",
"dash-1-name-ref": "dash_threeve",

View File

@ -7360,6 +7360,8 @@ components:
type: "string"
orgID:
type: "string"
pkgName:
type: string
name:
type: "string"
description:
@ -7377,12 +7379,18 @@ components:
items:
type: object
properties:
status:
type: string
resourcePkgName:
type: string
resourceName:
type: string
resourceID:
type: string
resourceType:
type: string
labelPkgName:
type: string
labelName:
type: string
labelID:
@ -7413,6 +7421,8 @@ components:
items:
type: object
properties:
pkgName:
type: string
name:
type: string
description:
@ -7460,6 +7470,8 @@ components:
items:
type: object
properties:
pkgName:
type: string
id:
type: string
name:
@ -7483,6 +7495,8 @@ components:
- $ref: "#/components/schemas/TelegrafRequest"
- type: object
properties:
pkgName:
type: string
labelAssociations:
type: array
items:
@ -7622,16 +7636,20 @@ components:
items:
type: object
properties:
isNew:
type: boolean
status:
type: string
resourceType:
type: string
resourceID:
type: string
resourcePkgName:
type: string
resourceName:
type: string
labelID:
type: string
labelPkgName:
type: string
labelName:
type: string
notificationEndpoints:

View File

@ -112,9 +112,8 @@ func (s *HTTPRemoteService) DryRun(ctx context.Context, orgID, userID influxdb.I
// Apply will apply all the resources identified in the provided pkg. The entire pkg will be applied
// in its entirety. If a failure happens midway then the entire pkg will be rolled back to the state
// from before the pkg was applied.
func (s *HTTPRemoteService) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, error) {
sum, _, err := s.apply(ctx, orgID, pkg, false, opts...)
return sum, err
func (s *HTTPRemoteService) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, error) {
return s.apply(ctx, orgID, pkg, false, opts...)
}
func (s *HTTPRemoteService) apply(ctx context.Context, orgID influxdb.ID, pkg *Pkg, dryRun bool, opts ...ApplyOptFn) (Summary, Diff, error) {

View File

@ -355,22 +355,21 @@ func (s *HTTPServer) applyPkg(w http.ResponseWriter, r *http.Request) {
ApplyWithStackID(stackID),
}
sum, diff, err := s.svc.DryRun(r.Context(), *orgID, userID, parsedPkg, applyOpts...)
if IsParseErr(err) {
s.api.Respond(w, http.StatusUnprocessableEntity, RespApplyPkg{
Diff: diff,
Summary: sum,
Errors: convertParseErr(err),
})
return
}
if err != nil {
s.api.Err(w, err)
return
}
// if only a dry run, then we exit before anything destructive
if reqBody.DryRun {
sum, diff, err := s.svc.DryRun(r.Context(), *orgID, userID, parsedPkg, applyOpts...)
if IsParseErr(err) {
s.api.Respond(w, http.StatusUnprocessableEntity, RespApplyPkg{
Diff: diff,
Summary: sum,
Errors: convertParseErr(err),
})
return
}
if err != nil {
s.api.Err(w, err)
return
}
s.api.Respond(w, http.StatusOK, RespApplyPkg{
Diff: diff,
Summary: sum,
@ -380,7 +379,7 @@ func (s *HTTPServer) applyPkg(w http.ResponseWriter, r *http.Request) {
applyOpts = append(applyOpts, ApplyWithSecrets(reqBody.Secrets))
sum, err = s.svc.Apply(r.Context(), *orgID, userID, parsedPkg, applyOpts...)
sum, diff, err := s.svc.Apply(r.Context(), *orgID, userID, parsedPkg, applyOpts...)
if err != nil && !IsParseErr(err) {
s.api.Err(w, err)
return

View File

@ -407,11 +407,13 @@ func TestPkgerHTTPServer(t *testing.T) {
t.Run("apply a pkg", func(t *testing.T) {
svc := &fakeSVC{
dryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) {
if err := pkg.Validate(); err != nil {
return pkger.Summary{}, pkger.Diff{}, err
applyFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) {
var opt pkger.ApplyOpt
for _, o := range opts {
o(&opt)
}
sum := pkg.Summary()
var diff pkger.Diff
for _, b := range sum.Buckets {
diff.Buckets = append(diff.Buckets, pkger.DiffBucket{
@ -420,18 +422,10 @@ func TestPkgerHTTPServer(t *testing.T) {
},
})
}
return sum, diff, nil
},
applyFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, error) {
var opt pkger.ApplyOpt
for _, o := range opts {
o(&opt)
}
sum := pkg.Summary()
for key := range opt.MissingSecrets {
sum.MissingSecrets = append(sum.MissingSecrets, key)
}
return sum, nil
return sum, diff, nil
},
}
@ -647,7 +641,7 @@ func decodeBody(t *testing.T, r io.Reader, v interface{}) {
type fakeSVC struct {
initStack func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error)
dryRunFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error)
applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, error)
applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error)
}
var _ pkger.SVC = (*fakeSVC)(nil)
@ -671,7 +665,7 @@ func (f *fakeSVC) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *pk
return f.dryRunFn(ctx, orgID, userID, pkg, opts...)
}
func (f *fakeSVC) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, error) {
func (f *fakeSVC) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) {
if f.applyFn == nil {
panic("not implemented")
}

View File

@ -8,7 +8,6 @@ import (
"reflect"
"regexp"
"sort"
"strconv"
"strings"
"time"
@ -193,7 +192,7 @@ type (
DiffIdentifier
New DiffBucketValues `json:"new"`
Old *DiffBucketValues `json:"old,omitempty"` // using omitempty here to signal there was no prev state with a nil
Old *DiffBucketValues `json:"old"`
}
// DiffBucketValues are the varying values for a bucket.
@ -204,32 +203,6 @@ type (
}
)
func newDiffBucket(b *bucket, i *influxdb.Bucket) DiffBucket {
diff := DiffBucket{
DiffIdentifier: DiffIdentifier{
ID: SafeID(b.ID()),
Remove: b.shouldRemove,
PkgName: b.PkgName(),
},
New: DiffBucketValues{
Name: b.Name(),
Description: b.Description,
RetentionRules: b.RetentionRules,
},
}
if i != nil {
diff.ID = SafeID(i.ID)
diff.Old = &DiffBucketValues{
Name: i.Name,
Description: i.Description,
}
if i.RetentionPeriod > 0 {
diff.Old.RetentionRules = retentionRules{newRetentionRule(i.RetentionPeriod)}
}
}
return diff
}
func (d DiffBucket) hasConflict() bool {
return !d.IsNew() && d.Old != nil && !reflect.DeepEqual(*d.Old, d.New)
}
@ -363,7 +336,7 @@ type (
DiffIdentifier
New DiffLabelValues `json:"new"`
Old *DiffLabelValues `json:"old,omitempty"` // using omitempty here to signal there was no prev state with a nil
Old *DiffLabelValues `json:"old"`
}
// DiffLabelValues are the varying values for a label.
@ -374,48 +347,39 @@ type (
}
)
func newDiffLabel(l *label, i *influxdb.Label) DiffLabel {
diff := DiffLabel{
DiffIdentifier: DiffIdentifier{
ID: SafeID(l.ID()),
Remove: l.shouldRemove,
PkgName: l.PkgName(),
},
New: DiffLabelValues{
Name: l.Name(),
Color: l.Color,
Description: l.Description,
},
}
if i != nil {
diff.ID = SafeID(i.ID)
diff.Old = &DiffLabelValues{
Name: i.Name,
Color: i.Properties["color"],
Description: i.Properties["description"],
}
}
return diff
}
func (d DiffLabel) hasConflict() bool {
return !d.IsNew() && d.Old != nil && *d.Old != d.New
}
// StateStatus indicates the status of a diff or summary resource
type StateStatus string
const (
StateStatusExists StateStatus = "exists"
StateStatusNew StateStatus = "new"
StateStatusRemove StateStatus = "remove"
)
// DiffLabelMapping is a diff of an individual label mapping. A
// single resource may have multiple mappings to multiple labels.
// A label can have many mappings to other resources.
type DiffLabelMapping struct {
IsNew bool `json:"isNew"`
StateStatus StateStatus `json:"stateStatus"`
ResType influxdb.ResourceType `json:"resourceType"`
ResID SafeID `json:"resourceID"`
ResName string `json:"resourceName"`
ResType influxdb.ResourceType `json:"resourceType"`
ResID SafeID `json:"resourceID"`
ResName string `json:"resourceName"`
ResPkgName string `json:"resourcePkgName"`
LabelID SafeID `json:"labelID"`
LabelName string `json:"labelName"`
LabelID SafeID `json:"labelID"`
LabelName string `json:"labelName"`
LabelPkgName string `json:"labelPkgName"`
}
//func (d DiffLabelMapping) IsNew() bool {
// return d.StateStatus == StateStatusNew
//}
// DiffNotificationEndpointValues are the varying values for a notification endpoint.
type DiffNotificationEndpointValues struct {
influxdb.NotificationEndpoint
@ -747,6 +711,7 @@ func (s *SummaryCheck) UnmarshalJSON(b []byte) error {
type SummaryDashboard struct {
ID SafeID `json:"id"`
OrgID SafeID `json:"orgID"`
PkgName string `json:"pkgName"`
Name string `json:"name"`
Description string `json:"description"`
Charts []SummaryChart `json:"charts"`
@ -868,6 +833,7 @@ func (s *SummaryNotificationEndpoint) UnmarshalJSON(b []byte) error {
type (
SummaryNotificationRule struct {
ID SafeID `json:"id"`
PkgName string `json:"pkgName"`
Name string `json:"name"`
Description string `json:"description"`
@ -911,17 +877,21 @@ type SummaryLabel struct {
// SummaryLabelMapping provides a summary of a label mapped with a single resource.
type SummaryLabelMapping struct {
exists bool
ResourceID SafeID `json:"resourceID"`
ResourceName string `json:"resourceName"`
ResourceType influxdb.ResourceType `json:"resourceType"`
LabelName string `json:"labelName"`
LabelID SafeID `json:"labelID"`
exists bool
Status StateStatus `json:"status,omitempty"`
ResourceID SafeID `json:"resourceID"`
ResourcePkgName string `json:"resourcePkgName"`
ResourceName string `json:"resourceName"`
ResourceType influxdb.ResourceType `json:"resourceType"`
LabelPkgName string `json:"labelPkgName"`
LabelName string `json:"labelName"`
LabelID SafeID `json:"labelID"`
}
// SummaryTask provides a summary of a task.
type SummaryTask struct {
ID SafeID `json:"id"`
PkgName string `json:"pkgName"`
Name string `json:"name"`
Cron string `json:"cron"`
Description string `json:"description"`
@ -935,6 +905,7 @@ type SummaryTask struct {
// SummaryTelegraf provides a summary of a pkg telegraf config.
type SummaryTelegraf struct {
PkgName string `json:"pkgName"`
TelegrafConfig influxdb.TelegrafConfig `json:"telegrafConfig"`
LabelAssociations []SummaryLabel `json:"labelAssociations"`
}
@ -950,195 +921,6 @@ type SummaryVariable struct {
LabelAssociations []SummaryLabel `json:"labelAssociations"`
}
type identity struct {
name *references
displayName *references
shouldRemove bool
}
func (i *identity) Name() string {
if displayName := i.displayName.String(); displayName != "" {
return displayName
}
return i.name.String()
}
func (i *identity) PkgName() string {
return i.name.String()
}
const (
fieldAPIVersion = "apiVersion"
fieldAssociations = "associations"
fieldDescription = "description"
fieldEvery = "every"
fieldKey = "key"
fieldKind = "kind"
fieldLanguage = "language"
fieldLevel = "level"
fieldMin = "min"
fieldMax = "max"
fieldMetadata = "metadata"
fieldName = "name"
fieldOffset = "offset"
fieldOperator = "operator"
fieldPrefix = "prefix"
fieldQuery = "query"
fieldSuffix = "suffix"
fieldSpec = "spec"
fieldStatus = "status"
fieldType = "type"
fieldValue = "value"
fieldValues = "values"
)
const (
fieldBucketRetentionRules = "retentionRules"
)
const bucketNameMinLength = 2
type bucket struct {
identity
id influxdb.ID
OrgID influxdb.ID
Description string
RetentionRules retentionRules
labels sortedLabels
// existing provides context for a resource that already
// exists in the platform. If a resource already exists
// then it will be referenced here.
existing *influxdb.Bucket
}
func (b *bucket) ID() influxdb.ID {
if b.existing != nil {
return b.existing.ID
}
return b.id
}
func (b *bucket) Labels() []*label {
return b.labels
}
func (b *bucket) ResourceType() influxdb.ResourceType {
return KindBucket.ResourceType()
}
func (b *bucket) Exists() bool {
return b.existing != nil
}
func (b *bucket) summarize() SummaryBucket {
return SummaryBucket{
ID: SafeID(b.ID()),
OrgID: SafeID(b.OrgID),
Name: b.Name(),
PkgName: b.PkgName(),
Description: b.Description,
RetentionPeriod: b.RetentionRules.RP(),
LabelAssociations: toSummaryLabels(b.labels...),
}
}
func (b *bucket) valid() []validationErr {
var vErrs []validationErr
if err, ok := isValidName(b.Name(), bucketNameMinLength); !ok {
vErrs = append(vErrs, err)
}
vErrs = append(vErrs, b.RetentionRules.valid()...)
if len(vErrs) == 0 {
return nil
}
return []validationErr{
objectValidationErr(fieldSpec, vErrs...),
}
}
func (b *bucket) shouldApply() bool {
return b.shouldRemove ||
b.existing == nil ||
b.Description != b.existing.Description ||
b.Name() != b.existing.Name ||
b.RetentionRules.RP() != b.existing.RetentionPeriod
}
type mapperBuckets []*bucket
func (b mapperBuckets) Association(i int) labelAssociater {
return b[i]
}
func (b mapperBuckets) Len() int {
return len(b)
}
const (
retentionRuleTypeExpire = "expire"
)
type retentionRule struct {
Type string `json:"type" yaml:"type"`
Seconds int `json:"everySeconds" yaml:"everySeconds"`
}
func newRetentionRule(d time.Duration) retentionRule {
return retentionRule{
Type: retentionRuleTypeExpire,
Seconds: int(d.Round(time.Second) / time.Second),
}
}
func (r retentionRule) valid() []validationErr {
const hour = 3600
var ff []validationErr
if r.Seconds < hour {
ff = append(ff, validationErr{
Field: fieldRetentionRulesEverySeconds,
Msg: "seconds must be a minimum of " + strconv.Itoa(hour),
})
}
if r.Type != retentionRuleTypeExpire {
ff = append(ff, validationErr{
Field: fieldType,
Msg: `type must be "expire"`,
})
}
return ff
}
const (
fieldRetentionRulesEverySeconds = "everySeconds"
)
type retentionRules []retentionRule
func (r retentionRules) RP() time.Duration {
// TODO: this feels very odd to me, will need to follow up with
// team to better understand this
for _, rule := range r {
return time.Duration(rule.Seconds) * time.Second
}
return 0
}
func (r retentionRules) valid() []validationErr {
var failures []validationErr
for i, rule := range r {
if ff := rule.valid(); len(ff) > 0 {
failures = append(failures, validationErr{
Field: fieldBucketRetentionRules,
Index: intPtr(i),
Nested: ff,
})
}
}
return failures
}
type checkKind int
const (
@ -1389,180 +1171,6 @@ func toInfluxThresholds(thresholds ...threshold) []icheck.ThresholdConfig {
return iThresh
}
type assocMapKey struct {
resType influxdb.ResourceType
name string
}
type assocMapVal struct {
exists bool
v interface{}
}
func (l assocMapVal) ID() influxdb.ID {
if t, ok := l.v.(labelAssociater); ok {
return t.ID()
}
return 0
}
type associationMapping struct {
mappings map[assocMapKey][]assocMapVal
}
func (l *associationMapping) setMapping(v interface {
ResourceType() influxdb.ResourceType
Name() string
}, exists bool) {
if l == nil {
return
}
if l.mappings == nil {
l.mappings = make(map[assocMapKey][]assocMapVal)
}
k := assocMapKey{
resType: v.ResourceType(),
name: v.Name(),
}
val := assocMapVal{
exists: exists,
v: v,
}
existing, ok := l.mappings[k]
if !ok {
l.mappings[k] = []assocMapVal{val}
return
}
for i, ex := range existing {
if ex.v == v {
existing[i].exists = exists
return
}
}
l.mappings[k] = append(l.mappings[k], val)
}
const (
fieldLabelColor = "color"
)
const labelNameMinLength = 2
type label struct {
identity
id influxdb.ID
OrgID influxdb.ID
Color string
Description string
associationMapping
// exists provides context for a resource that already
// exists in the platform. If a resource already exists(exists=true)
// then the ID should be populated.
existing *influxdb.Label
}
func (l *label) ID() influxdb.ID {
if l.existing != nil {
return l.existing.ID
}
return l.id
}
func (l *label) shouldApply() bool {
return l.existing == nil ||
l.Description != l.existing.Properties["description"] ||
l.Name() != l.existing.Name ||
l.Color != l.existing.Properties["color"]
}
func (l *label) summarize() SummaryLabel {
return SummaryLabel{
ID: SafeID(l.ID()),
OrgID: SafeID(l.OrgID),
PkgName: l.PkgName(),
Name: l.Name(),
Properties: struct {
Color string `json:"color"`
Description string `json:"description"`
}{
Color: l.Color,
Description: l.Description,
},
}
}
func (l *label) mappingSummary() []SummaryLabelMapping {
var mappings []SummaryLabelMapping
for resource, vals := range l.mappings {
for _, v := range vals {
mappings = append(mappings, SummaryLabelMapping{
exists: v.exists,
ResourceID: SafeID(v.ID()),
ResourceName: resource.name,
ResourceType: resource.resType,
LabelID: SafeID(l.ID()),
LabelName: l.Name(),
})
}
}
return mappings
}
func (l *label) properties() map[string]string {
return map[string]string{
"color": l.Color,
"description": l.Description,
}
}
func (l *label) toInfluxLabel() influxdb.Label {
return influxdb.Label{
ID: l.ID(),
OrgID: l.OrgID,
Name: l.Name(),
Properties: l.properties(),
}
}
func (l *label) valid() []validationErr {
var vErrs []validationErr
if err, ok := isValidName(l.Name(), labelNameMinLength); !ok {
vErrs = append(vErrs, err)
}
if len(vErrs) == 0 {
return nil
}
return []validationErr{
objectValidationErr(fieldSpec, vErrs...),
}
}
func toSummaryLabels(labels ...*label) []SummaryLabel {
iLabels := make([]SummaryLabel, 0, len(labels))
for _, l := range labels {
iLabels = append(iLabels, l.summarize())
}
return iLabels
}
type sortedLabels []*label
func (s sortedLabels) Len() int {
return len(s)
}
func (s sortedLabels) Less(i, j int) bool {
return s[i].Name() < s[j].Name()
}
func (s sortedLabels) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
type notificationKind int
const (
@ -1858,6 +1466,7 @@ func (r *notificationRule) Status() influxdb.Status {
func (r *notificationRule) summarize() SummaryNotificationRule {
return SummaryNotificationRule{
ID: SafeID(r.ID()),
PkgName: r.PkgName(),
Name: r.Name(),
EndpointID: SafeID(r.endpointID),
EndpointName: r.endpointName.String(),
@ -2133,6 +1742,7 @@ func (t *task) flux() string {
func (t *task) summarize() SummaryTask {
return SummaryTask{
ID: SafeID(t.ID()),
PkgName: t.PkgName(),
Name: t.Name(),
Cron: t.cron,
Description: t.description,
@ -2230,6 +1840,7 @@ func (t *telegraf) summarize() SummaryTelegraf {
cfg := t.config
cfg.Name = t.Name()
return SummaryTelegraf{
PkgName: t.PkgName(),
TelegrafConfig: cfg,
LabelAssociations: toSummaryLabels(t.labels...),
}
@ -2441,6 +2052,7 @@ func (d *dashboard) summarize() SummaryDashboard {
iDash := SummaryDashboard{
ID: SafeID(d.ID()),
OrgID: SafeID(d.OrgID),
PkgName: d.PkgName(),
Name: d.Name(),
Description: d.Description,
LabelAssociations: toSummaryLabels(d.labels...),

View File

@ -16,16 +16,12 @@ func TestPkg(t *testing.T) {
pkg := Pkg{
mBuckets: map[string]*bucket{
"buck_2": {
id: influxdb.ID(2),
OrgID: influxdb.ID(100),
Description: "desc2",
identity: identity{name: &references{val: "name2"}},
identity: identity{name: &references{val: "pkgName2"}, displayName: &references{val: "name2"}},
RetentionRules: retentionRules{newRetentionRule(2 * time.Hour)},
},
"buck_1": {
id: influxdb.ID(1),
OrgID: influxdb.ID(100),
identity: identity{name: &references{val: "name1"}},
identity: identity{name: &references{val: "pkgName1"}, displayName: &references{val: "name1"}},
Description: "desc1",
RetentionRules: retentionRules{newRetentionRule(time.Hour)},
},
@ -33,13 +29,13 @@ func TestPkg(t *testing.T) {
}
summary := pkg.Summary()
require.Len(t, summary.Buckets, len(pkg.mBuckets))
for i := 1; i <= len(summary.Buckets); i++ {
buck := summary.Buckets[i-1]
assert.Equal(t, SafeID(i), buck.ID)
assert.Equal(t, SafeID(100), buck.OrgID)
assert.Zero(t, buck.ID)
assert.Zero(t, buck.OrgID)
assert.Equal(t, "desc"+strconv.Itoa(i), buck.Description)
assert.Equal(t, "pkgName"+strconv.Itoa(i), buck.PkgName)
assert.Equal(t, "name"+strconv.Itoa(i), buck.Name)
assert.Equal(t, time.Duration(i)*time.Hour, buck.RetentionPeriod)
}
@ -49,16 +45,12 @@ func TestPkg(t *testing.T) {
pkg := Pkg{
mLabels: map[string]*label{
"2": {
id: influxdb.ID(2),
OrgID: influxdb.ID(100),
identity: identity{name: &references{val: "name2"}},
identity: identity{name: &references{val: "pkgName2"}, displayName: &references{val: "name2"}},
Description: "desc2",
Color: "blurple",
},
"1": {
id: influxdb.ID(1),
OrgID: influxdb.ID(100),
identity: identity{name: &references{val: "name1"}},
identity: identity{name: &references{val: "pkgName1"}, displayName: &references{val: "name1"}},
Description: "desc1",
Color: "peru",
},
@ -69,29 +61,24 @@ func TestPkg(t *testing.T) {
require.Len(t, summary.Labels, len(pkg.mLabels))
label1 := summary.Labels[0]
assert.Equal(t, SafeID(1), label1.ID)
assert.Equal(t, SafeID(100), label1.OrgID)
assert.Equal(t, "pkgName1", label1.PkgName)
assert.Equal(t, "name1", label1.Name)
assert.Equal(t, "desc1", label1.Properties.Description)
assert.Equal(t, "peru", label1.Properties.Color)
label2 := summary.Labels[1]
assert.Equal(t, SafeID(2), label2.ID)
assert.Equal(t, SafeID(100), label2.OrgID)
assert.Equal(t, "desc2", label2.Properties.Description)
assert.Equal(t, "pkgName2", label2.PkgName)
assert.Equal(t, "name2", label2.Name)
assert.Equal(t, "desc2", label2.Properties.Description)
assert.Equal(t, "blurple", label2.Properties.Color)
})
t.Run("label mappings returned in asc order by name", func(t *testing.T) {
bucket1 := &bucket{
id: influxdb.ID(20),
identity: identity{name: &references{val: "b1"}},
identity: identity{name: &references{val: "pkgBucket1"}, displayName: &references{val: "bd1"}},
}
label1 := &label{
id: influxdb.ID(2),
OrgID: influxdb.ID(100),
identity: identity{name: &references{val: "name2"}},
identity: identity{name: &references{val: "pkgLabel2"}, displayName: &references{val: "name2"}},
Description: "desc2",
Color: "blurple",
associationMapping: associationMapping{
@ -108,18 +95,18 @@ func TestPkg(t *testing.T) {
bucket1.labels = append(bucket1.labels, label1)
pkg := Pkg{
mBuckets: map[string]*bucket{bucket1.Name(): bucket1},
mLabels: map[string]*label{label1.Name(): label1},
mBuckets: map[string]*bucket{bucket1.PkgName(): bucket1},
mLabels: map[string]*label{label1.PkgName(): label1},
}
summary := pkg.Summary()
require.Len(t, summary.LabelMappings, 1)
mapping1 := summary.LabelMappings[0]
assert.Equal(t, SafeID(bucket1.id), mapping1.ResourceID)
assert.Equal(t, bucket1.PkgName(), mapping1.ResourcePkgName)
assert.Equal(t, bucket1.Name(), mapping1.ResourceName)
assert.Equal(t, influxdb.BucketsResourceType, mapping1.ResourceType)
assert.Equal(t, SafeID(label1.id), mapping1.LabelID)
assert.Equal(t, label1.PkgName(), mapping1.LabelPkgName)
assert.Equal(t, label1.Name(), mapping1.LabelName)
})
})
@ -489,11 +476,6 @@ func TestPkg(t *testing.T) {
kind Kind
validName string
}{
{
pkgFile: "testdata/bucket.yml",
kind: KindBucket,
validName: "rucket_11",
},
{
pkgFile: "testdata/checks.yml",
kind: KindCheck,

View File

@ -257,8 +257,7 @@ type Pkg struct {
mEnvVals map[string]string
mSecrets map[string]bool
isVerified bool // dry run has verified pkg resources with existing resources
isParsed bool // indicates the pkg has been parsed and all resources graphed accordingly
isParsed bool // indicates the pkg has been parsed and all resources graphed accordingly
}
// Encode is a helper for encoding the pkg correctly.
@ -302,21 +301,13 @@ func (p *Pkg) Summary() Summary {
NotificationRules: []SummaryNotificationRule{},
Labels: []SummaryLabel{},
MissingEnvs: p.missingEnvRefs(),
MissingSecrets: []string{},
MissingSecrets: p.missingSecrets(),
Tasks: []SummaryTask{},
TelegrafConfigs: []SummaryTelegraf{},
Variables: []SummaryVariable{},
}
// only add this after dry run has been completed
if p.isVerified {
sum.MissingSecrets = p.missingSecrets()
}
for _, b := range p.buckets() {
if b.shouldRemove {
continue
}
sum.Buckets = append(sum.Buckets, b.summarize())
}
@ -418,11 +409,6 @@ func (p *Pkg) addObjectForRemoval(k Kind, pkgName string, id influxdb.ID) {
}
switch k {
case KindBucket:
p.mBuckets[pkgName] = &bucket{
identity: newIdentity,
id: id,
}
case KindCheck, KindCheckDeadman, KindCheckThreshold:
p.mChecks[pkgName] = &check{
identity: newIdentity,
@ -471,11 +457,6 @@ func (p *Pkg) addObjectForRemoval(k Kind, pkgName string, id influxdb.ID) {
func (p *Pkg) getObjectIDSetter(k Kind, pkgName string) (func(influxdb.ID), bool) {
switch k {
case KindBucket:
b, ok := p.mBuckets[pkgName]
return func(id influxdb.ID) {
b.id = id
}, ok
case KindCheck, KindCheckDeadman, KindCheckThreshold:
ch, ok := p.mChecks[pkgName]
return func(id influxdb.ID) {

321
pkger/parser_models.go Normal file
View File

@ -0,0 +1,321 @@
package pkger
import (
"strconv"
"time"
"github.com/influxdata/influxdb/v2"
)
type identity struct {
name *references
displayName *references
shouldRemove bool
}
func (i *identity) Name() string {
if displayName := i.displayName.String(); displayName != "" {
return displayName
}
return i.name.String()
}
func (i *identity) PkgName() string {
return i.name.String()
}
const (
fieldAPIVersion = "apiVersion"
fieldAssociations = "associations"
fieldDescription = "description"
fieldEvery = "every"
fieldKey = "key"
fieldKind = "kind"
fieldLanguage = "language"
fieldLevel = "level"
fieldMin = "min"
fieldMax = "max"
fieldMetadata = "metadata"
fieldName = "name"
fieldOffset = "offset"
fieldOperator = "operator"
fieldPrefix = "prefix"
fieldQuery = "query"
fieldSuffix = "suffix"
fieldSpec = "spec"
fieldStatus = "status"
fieldType = "type"
fieldValue = "value"
fieldValues = "values"
)
const (
fieldBucketRetentionRules = "retentionRules"
)
const bucketNameMinLength = 2
type bucket struct {
identity
Description string
RetentionRules retentionRules
labels sortedLabels
}
func (b *bucket) summarize() SummaryBucket {
return SummaryBucket{
Name: b.Name(),
PkgName: b.PkgName(),
Description: b.Description,
RetentionPeriod: b.RetentionRules.RP(),
LabelAssociations: toSummaryLabels(b.labels...),
}
}
func (b *bucket) ResourceType() influxdb.ResourceType {
return KindBucket.ResourceType()
}
func (b *bucket) valid() []validationErr {
var vErrs []validationErr
if err, ok := isValidName(b.Name(), bucketNameMinLength); !ok {
vErrs = append(vErrs, err)
}
vErrs = append(vErrs, b.RetentionRules.valid()...)
if len(vErrs) == 0 {
return nil
}
return []validationErr{
objectValidationErr(fieldSpec, vErrs...),
}
}
const (
retentionRuleTypeExpire = "expire"
)
type retentionRule struct {
Type string `json:"type" yaml:"type"`
Seconds int `json:"everySeconds" yaml:"everySeconds"`
}
func newRetentionRule(d time.Duration) retentionRule {
return retentionRule{
Type: retentionRuleTypeExpire,
Seconds: int(d.Round(time.Second) / time.Second),
}
}
func (r retentionRule) valid() []validationErr {
const hour = 3600
var ff []validationErr
if r.Seconds < hour {
ff = append(ff, validationErr{
Field: fieldRetentionRulesEverySeconds,
Msg: "seconds must be a minimum of " + strconv.Itoa(hour),
})
}
if r.Type != retentionRuleTypeExpire {
ff = append(ff, validationErr{
Field: fieldType,
Msg: `type must be "expire"`,
})
}
return ff
}
const (
fieldRetentionRulesEverySeconds = "everySeconds"
)
type retentionRules []retentionRule
func (r retentionRules) RP() time.Duration {
// TODO: this feels very odd to me, will need to follow up with
// team to better understand this
for _, rule := range r {
return time.Duration(rule.Seconds) * time.Second
}
return 0
}
func (r retentionRules) valid() []validationErr {
var failures []validationErr
for i, rule := range r {
if ff := rule.valid(); len(ff) > 0 {
failures = append(failures, validationErr{
Field: fieldBucketRetentionRules,
Index: intPtr(i),
Nested: ff,
})
}
}
return failures
}
type assocMapKey struct {
resType influxdb.ResourceType
name string
}
type assocMapVal struct {
exists bool
v interface{}
}
func (l assocMapVal) ID() influxdb.ID {
if t, ok := l.v.(labelAssociater); ok {
return t.ID()
}
return 0
}
func (l assocMapVal) PkgName() string {
t, ok := l.v.(interface{ PkgName() string })
if ok {
return t.PkgName()
}
return ""
}
type associationMapping struct {
mappings map[assocMapKey][]assocMapVal
}
func (l *associationMapping) setMapping(v interface {
ResourceType() influxdb.ResourceType
Name() string
}, exists bool) {
if l == nil {
return
}
if l.mappings == nil {
l.mappings = make(map[assocMapKey][]assocMapVal)
}
k := assocMapKey{
resType: v.ResourceType(),
name: v.Name(),
}
val := assocMapVal{
exists: exists,
v: v,
}
existing, ok := l.mappings[k]
if !ok {
l.mappings[k] = []assocMapVal{val}
return
}
for i, ex := range existing {
if ex.v == v {
existing[i].exists = exists
return
}
}
l.mappings[k] = append(l.mappings[k], val)
}
const (
fieldLabelColor = "color"
)
const labelNameMinLength = 2
type label struct {
id influxdb.ID
identity
Color string
Description string
associationMapping
// exists provides context for a resource that already
// exists in the platform. If a resource already exists(exists=true)
// then the ID should be populated.
existing *influxdb.Label
}
func (l *label) summarize() SummaryLabel {
return SummaryLabel{
PkgName: l.PkgName(),
Name: l.Name(),
Properties: struct {
Color string `json:"color"`
Description string `json:"description"`
}{
Color: l.Color,
Description: l.Description,
},
}
}
func (l *label) mappingSummary() []SummaryLabelMapping {
var mappings []SummaryLabelMapping
for resource, vals := range l.mappings {
for _, v := range vals {
status := StateStatusNew
if v.exists {
status = StateStatusExists
}
mappings = append(mappings, SummaryLabelMapping{
exists: v.exists,
Status: status,
ResourceID: SafeID(v.ID()),
ResourcePkgName: v.PkgName(),
ResourceName: resource.name,
ResourceType: resource.resType,
LabelID: SafeID(l.ID()),
LabelPkgName: l.PkgName(),
LabelName: l.Name(),
})
}
}
return mappings
}
func (l *label) ID() influxdb.ID {
if l.id != 0 {
return l.id
}
if l.existing != nil {
return l.existing.ID
}
return 0
}
func (l *label) valid() []validationErr {
var vErrs []validationErr
if err, ok := isValidName(l.Name(), labelNameMinLength); !ok {
vErrs = append(vErrs, err)
}
if len(vErrs) == 0 {
return nil
}
return []validationErr{
objectValidationErr(fieldSpec, vErrs...),
}
}
func toSummaryLabels(labels ...*label) []SummaryLabel {
iLabels := make([]SummaryLabel, 0, len(labels))
for _, l := range labels {
iLabels = append(iLabels, l.summarize())
}
return iLabels
}
type sortedLabels []*label
func (s sortedLabels) Len() int {
return len(s)
}
func (s sortedLabels) Less(i, j int) bool {
return s[i].Name() < s[j].Name()
}
func (s sortedLabels) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

View File

@ -339,27 +339,35 @@ spec:
expectedMappings := []SummaryLabelMapping{
{
ResourceName: "rucket_1",
LabelName: "label_1",
ResourcePkgName: "rucket_1",
ResourceName: "rucket_1",
LabelPkgName: "label_1",
LabelName: "label_1",
},
{
ResourceName: "rucket_2",
LabelName: "label_2",
ResourcePkgName: "rucket_2",
ResourceName: "rucket_2",
LabelPkgName: "label_2",
LabelName: "label_2",
},
{
ResourceName: "rucket_3",
LabelName: "label_1",
ResourcePkgName: "rucket_3",
ResourceName: "rucket_3",
LabelPkgName: "label_1",
LabelName: "label_1",
},
{
ResourceName: "rucket_3",
LabelName: "label_2",
ResourcePkgName: "rucket_3",
ResourceName: "rucket_3",
LabelPkgName: "label_2",
LabelName: "label_2",
},
}
require.Len(t, sum.LabelMappings, len(expectedMappings))
for i, expected := range expectedMappings {
expected.ResourceType = influxdb.BucketsResourceType
assert.Equal(t, expected, sum.LabelMappings[i])
for _, expectedMapping := range expectedMappings {
expectedMapping.Status = StateStatusNew
expectedMapping.ResourceType = influxdb.BucketsResourceType
assert.Contains(t, sum.LabelMappings, expectedMapping)
}
})
})
@ -521,18 +529,25 @@ spec:
assert.True(t, deadmanCheck.ReportZero)
assert.Len(t, check2.LabelAssociations, 1)
containsLabelMappings(t, sum.LabelMappings,
labelMapping{
labelName: "label_1",
resName: "check_0",
resType: influxdb.ChecksResourceType,
expectedMappings := []SummaryLabelMapping{
{
LabelPkgName: "label_1",
LabelName: "label_1",
ResourcePkgName: "check_0",
ResourceName: "check_0",
},
labelMapping{
labelName: "label_1",
resName: "display name",
resType: influxdb.ChecksResourceType,
{
LabelPkgName: "label_1",
LabelName: "label_1",
ResourcePkgName: "check_1",
ResourceName: "display name",
},
)
}
for _, expected := range expectedMappings {
expected.Status = StateStatusNew
expected.ResourceType = influxdb.ChecksResourceType
assert.Contains(t, sum.LabelMappings, expected)
}
})
})
@ -2589,18 +2604,14 @@ spec:
actualLabel := actual.LabelAssociations[0]
assert.Equal(t, "label_1", actualLabel.Name)
expectedMappings := []SummaryLabelMapping{
{
ResourceName: "dash_1",
LabelName: "label_1",
},
}
require.Len(t, sum.LabelMappings, len(expectedMappings))
for i, expected := range expectedMappings {
expected.ResourceType = influxdb.DashboardsResourceType
assert.Equal(t, expected, sum.LabelMappings[i])
}
assert.Contains(t, sum.LabelMappings, SummaryLabelMapping{
Status: StateStatusNew,
ResourceType: influxdb.DashboardsResourceType,
ResourcePkgName: "dash_1",
ResourceName: "dash_1",
LabelPkgName: "label_1",
LabelName: "label_1",
})
})
})
@ -2696,6 +2707,7 @@ spec:
testfileRunner(t, "testdata/notification_endpoint", func(t *testing.T, pkg *Pkg) {
expectedEndpoints := []SummaryNotificationEndpoint{
{
PkgName: "http_basic_auth_notification_endpoint",
NotificationEndpoint: &endpoint.HTTP{
Base: endpoint.Base{
Name: "basic endpoint name",
@ -2710,6 +2722,7 @@ spec:
},
},
{
PkgName: "http_bearer_auth_notification_endpoint",
NotificationEndpoint: &endpoint.HTTP{
Base: endpoint.Base{
Name: "http_bearer_auth_notification_endpoint",
@ -2723,6 +2736,7 @@ spec:
},
},
{
PkgName: "http_none_auth_notification_endpoint",
NotificationEndpoint: &endpoint.HTTP{
Base: endpoint.Base{
Name: "http_none_auth_notification_endpoint",
@ -2735,6 +2749,7 @@ spec:
},
},
{
PkgName: "pager_duty_notification_endpoint",
NotificationEndpoint: &endpoint.PagerDuty{
Base: endpoint.Base{
Name: "pager duty name",
@ -2746,6 +2761,7 @@ spec:
},
},
{
PkgName: "slack_notification_endpoint",
NotificationEndpoint: &endpoint.Slack{
Base: endpoint.Base{
Name: "slack name",
@ -2769,10 +2785,13 @@ spec:
require.Len(t, actual.LabelAssociations, 1)
assert.Equal(t, "label_1", actual.LabelAssociations[0].Name)
containsLabelMappings(t, sum.LabelMappings, labelMapping{
labelName: "label_1",
resName: expected.NotificationEndpoint.GetName(),
resType: influxdb.NotificationEndpointResourceType,
assert.Contains(t, sum.LabelMappings, SummaryLabelMapping{
Status: StateStatusNew,
ResourceType: influxdb.NotificationEndpointResourceType,
ResourcePkgName: expected.PkgName,
ResourceName: expected.NotificationEndpoint.GetName(),
LabelPkgName: "label_1",
LabelName: "label_1",
})
}
})
@ -3529,9 +3548,12 @@ spec:
require.Len(t, sum.LabelMappings, 1)
expectedMapping := SummaryLabelMapping{
ResourceName: "display name",
LabelName: "label_1",
ResourceType: influxdb.TelegrafsResourceType,
Status: StateStatusNew,
ResourcePkgName: "first_tele_config",
ResourceName: "display name",
LabelPkgName: "label_1",
LabelName: "label_1",
ResourceType: influxdb.TelegrafsResourceType,
}
assert.Equal(t, expectedMapping, sum.LabelMappings[0])
})
@ -3803,8 +3825,11 @@ spec:
expectedMappings := []SummaryLabelMapping{
{
ResourceName: "var_1",
LabelName: "label_1",
Status: StateStatusNew,
ResourcePkgName: "var_1",
ResourceName: "var_1",
LabelPkgName: "label_1",
LabelName: "label_1",
},
}
@ -4406,25 +4431,6 @@ func testfileRunner(t *testing.T, path string, testFn func(t *testing.T, pkg *Pk
}
}
type labelMapping struct {
labelName string
resName string
resType influxdb.ResourceType
}
func containsLabelMappings(t *testing.T, labelMappings []SummaryLabelMapping, matches ...labelMapping) {
t.Helper()
for _, expected := range matches {
expectedMapping := SummaryLabelMapping{
ResourceName: expected.resName,
LabelName: expected.labelName,
ResourceType: expected.resType,
}
assert.Contains(t, labelMappings, expectedMapping)
}
}
func strPtr(s string) *string {
return &s
}

View File

@ -52,7 +52,7 @@ type SVC interface {
InitStack(ctx context.Context, userID influxdb.ID, stack Stack) (Stack, error)
CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error)
DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, error)
Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, error)
Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, error)
}
// SVCMiddleware is a service middleware func.
@ -636,6 +636,11 @@ func (s *Service) filterOrgResourceKinds(resourceKindFilters []Kind) []struct {
// for later calls to Apply. This func will be run on an Apply if it has not been run
// already.
func (s *Service) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, error) {
sum, diff, _, err := s.dryRun(ctx, orgID, pkg, opts...)
return sum, diff, err
}
func (s *Service) dryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, *stateCoordinator, error) {
// so here's the deal, when we have issues with the parsing validation, we
// continue to do the diff anyhow. any resource that does not have a name
// will be skipped, and won't bleed into the dry run here. We can now return
@ -644,7 +649,7 @@ func (s *Service) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pk
if !pkg.isParsed {
err := pkg.Validate()
if err != nil && !IsParseErr(err) {
return Summary{}, Diff{}, internalErr(err)
return Summary{}, Diff{}, nil, internalErr(err)
}
parseErr = err
}
@ -654,75 +659,74 @@ func (s *Service) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pk
if len(opt.EnvRefs) > 0 {
err := pkg.applyEnvRefs(opt.EnvRefs)
if err != nil && !IsParseErr(err) {
return Summary{}, Diff{}, internalErr(err)
return Summary{}, Diff{}, nil, internalErr(err)
}
parseErr = err
}
state := newStateCoordinator(pkg)
if opt.StackID > 0 {
if err := s.addStackPkgState(ctx, opt.StackID, pkg); err != nil {
return Summary{}, Diff{}, internalErr(err)
if err := s.addStackState(ctx, opt.StackID, pkg, state); err != nil {
return Summary{}, Diff{}, nil, internalErr(err)
}
}
if err := s.dryRunSecrets(ctx, orgID, pkg); err != nil {
return Summary{}, Diff{}, err
return Summary{}, Diff{}, nil, err
}
diff := Diff{
Buckets: s.dryRunBuckets(ctx, orgID, pkg),
Checks: s.dryRunChecks(ctx, orgID, pkg),
Dashboards: s.dryRunDashboards(pkg),
Labels: s.dryRunLabels(ctx, orgID, pkg),
Tasks: s.dryRunTasks(pkg),
Telegrafs: s.dryRunTelegraf(pkg),
Variables: s.dryRunVariables(ctx, orgID, pkg),
}
s.dryRunBuckets(ctx, orgID, state.mBuckets)
s.dryRunLabels(ctx, orgID, state.mLabels)
var diff Diff
diff.Checks = s.dryRunChecks(ctx, orgID, pkg)
diff.Dashboards = s.dryRunDashboards(pkg)
diff.Tasks = s.dryRunTasks(pkg)
diff.Telegrafs = s.dryRunTelegraf(pkg)
diff.Variables = s.dryRunVariables(ctx, orgID, pkg)
diffEndpoints, err := s.dryRunNotificationEndpoints(ctx, orgID, pkg)
if err != nil {
return Summary{}, Diff{}, err
return Summary{}, Diff{}, nil, err
}
diff.NotificationEndpoints = diffEndpoints
diffRules, err := s.dryRunNotificationRules(ctx, orgID, pkg)
if err != nil {
return Summary{}, Diff{}, err
return Summary{}, Diff{}, nil, err
}
diff.NotificationRules = diffRules
diffLabelMappings, err := s.dryRunLabelMappings(ctx, pkg)
stateLabelMappings, err := s.dryRunLabelMappingsV2(ctx, state)
if err != nil {
return Summary{}, Diff{}, err
return Summary{}, Diff{}, nil, err
}
diff.LabelMappings = diffLabelMappings
state.labelMappings = stateLabelMappings
// verify the pkg is verified by a dry run. when calling Service.Apply this
// is required to have been run. if it is not true, then apply runs
// the Dry run.
pkg.isVerified = true
return pkg.Summary(), diff, parseErr
stateDiff := state.diff()
diffLabelMappings, err := s.dryRunLabelMappings(ctx, pkg, state)
if err != nil {
return Summary{}, Diff{}, nil, err
}
diff.LabelMappings = append(diffLabelMappings, diffLabelMappings...)
diff.Buckets = stateDiff.Buckets
diff.Labels = stateDiff.Labels
diff.LabelMappings = append(stateDiff.LabelMappings, diff.LabelMappings...)
return pkg.Summary(), diff, state, parseErr
}
func (s *Service) dryRunBuckets(ctx context.Context, orgID influxdb.ID, pkg *Pkg) []DiffBucket {
mExistingBkts := make(map[string]DiffBucket)
bkts := pkg.buckets()
for i := range bkts {
b := bkts[i]
existingBkt, _ := s.findBucket(ctx, orgID, b) // ignoring error here
b.existing = existingBkt
mExistingBkts[b.Name()] = newDiffBucket(b, existingBkt)
func (s *Service) dryRunBuckets(ctx context.Context, orgID influxdb.ID, bkts map[string]*stateBucket) {
for _, stateBkt := range bkts {
if stateBkt.ID() != 0 {
stateBkt.existing, _ = s.bucketSVC.FindBucketByID(ctx, stateBkt.ID())
} else {
stateBkt.existing, _ = s.bucketSVC.FindBucketByName(ctx, orgID, stateBkt.Name())
}
}
diffs := make([]DiffBucket, 0, len(mExistingBkts))
for _, diff := range mExistingBkts {
diffs = append(diffs, diff)
}
sort.Slice(diffs, func(i, j int) bool {
return diffs[i].PkgName < diffs[j].PkgName
})
return diffs
}
func (s *Service) dryRunChecks(ctx context.Context, orgID influxdb.ID, pkg *Pkg) []DiffCheck {
@ -756,25 +760,11 @@ func (s *Service) dryRunDashboards(pkg *Pkg) []DiffDashboard {
return diffs
}
func (s *Service) dryRunLabels(ctx context.Context, orgID influxdb.ID, pkg *Pkg) []DiffLabel {
mExistingLabels := make(map[string]DiffLabel)
labels := pkg.labels()
for i := range labels {
pkgLabel := labels[i]
func (s *Service) dryRunLabels(ctx context.Context, orgID influxdb.ID, labels map[string]*stateLabel) {
for _, pkgLabel := range labels {
existingLabel, _ := s.findLabel(ctx, orgID, pkgLabel)
pkgLabel.existing = existingLabel
mExistingLabels[pkgLabel.Name()] = newDiffLabel(pkgLabel, existingLabel)
}
diffs := make([]DiffLabel, 0, len(mExistingLabels))
for _, diff := range mExistingLabels {
diffs = append(diffs, diff)
}
sort.Slice(diffs, func(i, j int) bool {
return diffs[i].PkgName < diffs[j].PkgName
})
return diffs
}
func (s *Service) dryRunNotificationEndpoints(ctx context.Context, orgID influxdb.ID, pkg *Pkg) ([]DiffNotificationEndpoint, error) {
@ -962,7 +952,7 @@ func (s *Service) dryRunVariables(ctx context.Context, orgID influxdb.ID, pkg *P
}
type (
labelMappingDiffFn func(labelID influxdb.ID, labelName string, isNew bool)
labelMappingDiffFn func(labelID influxdb.ID, labelPkgName, labelName string, isNew bool)
labelMappers interface {
Association(i int) labelAssociater
@ -972,15 +962,15 @@ type (
labelAssociater interface {
ID() influxdb.ID
Name() string
PkgName() string
Labels() []*label
ResourceType() influxdb.ResourceType
Exists() bool
}
)
func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg) ([]DiffLabelMapping, error) {
func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg, state *stateCoordinator) ([]DiffLabelMapping, error) {
mappers := []labelMappers{
mapperBuckets(pkg.buckets()),
mapperChecks(pkg.checks()),
mapperDashboards(pkg.dashboards()),
mapperNotificationEndpoints(pkg.notificationEndpoints()),
@ -994,19 +984,26 @@ func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg) ([]DiffLabe
for _, mapper := range mappers {
for i := 0; i < mapper.Len(); i++ {
la := mapper.Association(i)
err := s.dryRunResourceLabelMapping(ctx, la, func(labelID influxdb.ID, labelName string, isNew bool) {
existingLabel, ok := pkg.mLabels[labelName]
err := s.dryRunResourceLabelMapping(ctx, la, func(labelID influxdb.ID, labelPkgName, labelName string, isNew bool) {
existingLabel, ok := state.mLabels[labelName]
if !ok {
return
}
existingLabel.setMapping(la, !isNew)
status := StateStatusExists
if isNew {
status = StateStatusNew
}
diffs = append(diffs, DiffLabelMapping{
IsNew: isNew,
ResType: la.ResourceType(),
ResID: SafeID(la.ID()),
ResName: la.Name(),
LabelID: SafeID(labelID),
LabelName: labelName,
StateStatus: status,
ResType: la.ResourceType(),
ResID: SafeID(la.ID()),
ResPkgName: la.PkgName(),
ResName: la.Name(),
LabelID: SafeID(labelID),
LabelPkgName: labelPkgName,
LabelName: labelName,
})
})
if err != nil {
@ -1039,7 +1036,7 @@ func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg) ([]DiffLabe
func (s *Service) dryRunResourceLabelMapping(ctx context.Context, la labelAssociater, mappingFn labelMappingDiffFn) error {
if !la.Exists() {
for _, l := range la.Labels() {
mappingFn(l.ID(), l.Name(), true)
mappingFn(l.ID(), l.PkgName(), l.Name(), true)
}
return nil
}
@ -1059,22 +1056,109 @@ func (s *Service) dryRunResourceLabelMapping(ctx context.Context, la labelAssoci
pkgLabels := labelSlcToMap(la.Labels())
for _, l := range existingLabels {
// should ignore any labels that are not specified in pkg
mappingFn(l.ID, l.Name, false)
// if label is found in state then we track the mapping and mark it existing
// otherwise we continue on
delete(pkgLabels, l.Name)
if pkgLabel, ok := pkgLabels[l.Name]; ok {
mappingFn(l.ID, pkgLabel.PkgName(), l.Name, false)
}
}
// now we add labels that were not apart of the existing labels
for _, l := range pkgLabels {
mappingFn(l.ID(), l.Name(), true)
mappingFn(l.ID(), l.PkgName(), l.Name(), true)
}
return nil
}
func (s *Service) addStackPkgState(ctx context.Context, stackID influxdb.ID, pkg *Pkg) error {
func (s *Service) dryRunLabelMappingsV2(ctx context.Context, state *stateCoordinator) ([]stateLabelMapping, error) {
stateLabelsByResName := make(map[string]*stateLabel)
for _, l := range state.mLabels {
if l.shouldRemove {
continue
}
stateLabelsByResName[l.Name()] = l
}
var mappings []stateLabelMapping
for _, b := range state.mBuckets {
if b.shouldRemove {
continue
}
mm, err := s.dryRunResourceLabelMappingV2(ctx, state, stateLabelsByResName, b)
if err != nil {
return nil, err
}
mappings = append(mappings, mm...)
}
return mappings, nil
}
func (s *Service) dryRunResourceLabelMappingV2(ctx context.Context, state *stateCoordinator, stateLabelsByResName map[string]*stateLabel, associatedResource interface {
labels() []*label
stateIdentity() stateIdentity
}) ([]stateLabelMapping, error) {
ident := associatedResource.stateIdentity()
pkgResourceLabels := associatedResource.labels()
var mappings []stateLabelMapping
if !ident.exists() {
for _, l := range pkgResourceLabels {
mappings = append(mappings, stateLabelMapping{
status: StateStatusNew,
resource: associatedResource,
label: state.getLabelByPkgName(l.PkgName()),
})
}
return mappings, nil
}
existingLabels, err := s.labelSVC.FindResourceLabels(ctx, influxdb.LabelMappingFilter{
ResourceID: ident.id,
ResourceType: ident.resourceType,
})
if err != nil {
return nil, err
}
pkgLabels := labelSlcToMap(pkgResourceLabels)
for _, l := range existingLabels {
// if label is found in state then we track the mapping and mark it existing
// otherwise we continue on
delete(pkgLabels, l.Name)
if sLabel, ok := stateLabelsByResName[l.Name]; ok {
mappings = append(mappings, stateLabelMapping{
status: StateStatusExists,
resource: associatedResource,
label: sLabel,
})
}
}
// now we add labels that do not exist
for _, l := range pkgLabels {
mappings = append(mappings, stateLabelMapping{
status: StateStatusNew,
resource: associatedResource,
label: state.getLabelByPkgName(l.PkgName()),
})
}
return mappings, nil
}
func (s *Service) addStackState(ctx context.Context, stackID influxdb.ID, pkg *Pkg, state *stateCoordinator) error {
stack, err := s.store.ReadStackByID(ctx, stackID)
if err != nil {
return internalErr(err)
return ierrors.Wrap(internalErr(err), "reading stack")
}
type stateMapper interface {
Contains(kind Kind, pkgName string) bool
setObjectID(kind Kind, pkgName string, id influxdb.ID)
addObjectForRemoval(kind Kind, pkgName string, id influxdb.ID)
}
// check resource exists in pkg
@ -1083,9 +1167,16 @@ func (s *Service) addStackPkgState(ctx context.Context, stackID influxdb.ID, pkg
// else
// add stub pkg resource that indicates it should be deleted
for _, r := range stack.Resources {
updateFn := pkg.setObjectID
if !pkg.Contains(r.Kind, r.Name) {
updateFn = pkg.addObjectForRemoval
var mapper stateMapper = pkg
if r.Kind.is(KindBucket, KindLabel) {
// hack for time being while we transition state out of pkg.
// this will take several passes to finish up.
mapper = state
}
updateFn := mapper.setObjectID
if !mapper.Contains(r.Kind, r.Name) {
updateFn = mapper.addObjectForRemoval
}
updateFn(r.Kind, r.Name, r.ID)
}
@ -1135,23 +1226,22 @@ func applyOptFromOptFns(opts ...ApplyOptFn) ApplyOpt {
// Apply will apply all the resources identified in the provided pkg. The entire pkg will be applied
// in its entirety. If a failure happens midway then the entire pkg will be rolled back to the state
// from before the pkg were applied.
func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (sum Summary, e error) {
func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (sum Summary, diff Diff, e error) {
if !pkg.isParsed {
if err := pkg.Validate(); err != nil {
return Summary{}, failedValidationErr(err)
return Summary{}, Diff{}, failedValidationErr(err)
}
}
opt := applyOptFromOptFns(opts...)
if err := pkg.applyEnvRefs(opt.EnvRefs); err != nil {
return Summary{}, failedValidationErr(err)
return Summary{}, Diff{}, failedValidationErr(err)
}
if !pkg.isVerified {
if _, _, err := s.DryRun(ctx, orgID, userID, pkg, opts...); err != nil {
return Summary{}, err
}
_, diff, state, err := s.dryRun(ctx, orgID, pkg, opts...)
if err != nil {
return Summary{}, Diff{}, err
}
defer func() {
@ -1164,7 +1254,7 @@ func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg
if e != nil {
updateStackFn = s.updateStackAfterRollback
}
if err := updateStackFn(ctx, stackID, pkg); err != nil {
if err := updateStackFn(ctx, stackID, pkg, state); err != nil {
s.log.Error("failed to update stack", zap.Error(err))
}
}()
@ -1172,15 +1262,15 @@ func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg
coordinator := &rollbackCoordinator{sem: make(chan struct{}, s.applyReqLimit)}
defer coordinator.rollback(s.log, &e, orgID)
sum, err := s.apply(ctx, coordinator, orgID, userID, pkg, opt.MissingSecrets)
sum, err = s.apply(ctx, coordinator, orgID, userID, pkg, state, opt.MissingSecrets)
if err != nil {
return Summary{}, err
return Summary{}, Diff{}, err
}
return sum, nil
return sum, diff, err
}
func (s *Service) apply(ctx context.Context, coordinator *rollbackCoordinator, orgID, userID influxdb.ID, pkg *Pkg, missingSecrets map[string]string) (sum Summary, e error) {
func (s *Service) apply(ctx context.Context, coordinator *rollbackCoordinator, orgID, userID influxdb.ID, pkg *Pkg, state *stateCoordinator, missingSecrets map[string]string) (sum Summary, e error) {
// each grouping here runs for its entirety, then returns an error that
// is indicative of running all appliers provided. For instance, the labels
// may have 1 variable fail and one of the buckets fails. The errors aggregate so
@ -1198,12 +1288,12 @@ func (s *Service) apply(ctx context.Context, coordinator *rollbackCoordinator, o
},
{
// deps for primary resources
s.applyLabels(ctx, pkg.labels()),
s.applyLabels(ctx, state.labels()),
},
{
// primary resources, can have relationships to labels
s.applyVariables(ctx, pkg.variables()),
s.applyBuckets(ctx, pkg.buckets()),
s.applyBuckets(ctx, state.buckets()),
s.applyChecks(ctx, pkg.checks()),
s.applyDashboards(pkg.dashboards()),
s.applyNotificationEndpoints(ctx, userID, pkg.notificationEndpoints()),
@ -1230,27 +1320,46 @@ func (s *Service) apply(ctx context.Context, coordinator *rollbackCoordinator, o
// secondary resources
// this last grouping relies on the above 2 steps having completely successfully
secondary := []applier{s.applyLabelMappings(pkg.labelMappings())}
secondary := []applier{
s.applyLabelMappings(pkg.labelMappings()),
s.applyLabelMappingsV2(state.labelMappings),
}
if err := coordinator.runTilEnd(ctx, orgID, userID, secondary...); err != nil {
return Summary{}, internalErr(err)
}
pkg.applySecrets(missingSecrets)
return pkg.Summary(), nil
stateSum := state.summary()
pkgSum := pkg.Summary()
pkgSum.Buckets = stateSum.Buckets
pkgSum.Labels = stateSum.Labels
// filter out label mappings that are from pgk and replace with those
// in state. This is temporary hack to provide a bridge to the promise land...
for _, lm := range pkgSum.LabelMappings {
if lm.ResourceType == influxdb.BucketsResourceType {
continue
}
stateSum.LabelMappings = append(stateSum.LabelMappings, lm)
}
pkgSum.LabelMappings = stateSum.LabelMappings
return pkgSum, nil
}
func (s *Service) applyBuckets(ctx context.Context, buckets []*bucket) applier {
func (s *Service) applyBuckets(ctx context.Context, buckets []*stateBucket) applier {
const resource = "bucket"
mutex := new(doMutex)
rollbackBuckets := make([]*bucket, 0, len(buckets))
rollbackBuckets := make([]*stateBucket, 0, len(buckets))
createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody {
var b bucket
var b *stateBucket
mutex.Do(func() {
buckets[i].OrgID = orgID
b = *buckets[i]
buckets[i].orgID = orgID
b = buckets[i]
})
if !b.shouldApply() {
return nil
@ -1284,20 +1393,21 @@ func (s *Service) applyBuckets(ctx context.Context, buckets []*bucket) applier {
}
}
func (s *Service) rollbackBuckets(ctx context.Context, buckets []*bucket) error {
rollbackFn := func(b *bucket) error {
func (s *Service) rollbackBuckets(ctx context.Context, buckets []*stateBucket) error {
rollbackFn := func(b *stateBucket) error {
var err error
switch {
case b.shouldRemove:
err = s.bucketSVC.CreateBucket(ctx, b.existing)
err = ierrors.Wrap(s.bucketSVC.CreateBucket(ctx, b.existing), "rolling back removed bucket")
case b.existing == nil:
err = s.bucketSVC.DeleteBucket(ctx, b.ID())
err = ierrors.Wrap(s.bucketSVC.DeleteBucket(ctx, b.ID()), "rolling back new bucket")
default:
rp := b.RetentionRules.RP()
_, err = s.bucketSVC.UpdateBucket(ctx, b.ID(), influxdb.BucketUpdate{
Description: &b.Description,
RetentionPeriod: &rp,
})
err = ierrors.Wrap(err, "rolling back existing bucket to previous state")
}
return err
}
@ -1317,7 +1427,7 @@ func (s *Service) rollbackBuckets(ctx context.Context, buckets []*bucket) error
return nil
}
func (s *Service) applyBucket(ctx context.Context, b bucket) (influxdb.Bucket, error) {
func (s *Service) applyBucket(ctx context.Context, b *stateBucket) (influxdb.Bucket, error) {
if b.shouldRemove {
if err := s.bucketSVC.DeleteBucket(ctx, b.ID()); err != nil {
return influxdb.Bucket{}, fmt.Errorf("failed to delete bucket[%q]: %w", b.ID(), err)
@ -1340,7 +1450,7 @@ func (s *Service) applyBucket(ctx context.Context, b bucket) (influxdb.Bucket, e
}
influxBucket := influxdb.Bucket{
OrgID: b.OrgID,
OrgID: b.orgID,
Description: b.Description,
Name: b.Name(),
RetentionPeriod: rp,
@ -1542,17 +1652,17 @@ func convertChartsToCells(ch []chart) []*influxdb.Cell {
return icells
}
func (s *Service) applyLabels(ctx context.Context, labels []*label) applier {
func (s *Service) applyLabels(ctx context.Context, labels []*stateLabel) applier {
const resource = "label"
mutex := new(doMutex)
rollBackLabels := make([]*label, 0, len(labels))
rollBackLabels := make([]*stateLabel, 0, len(labels))
createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody {
var l label
var l *stateLabel
mutex.Do(func() {
labels[i].OrgID = orgID
l = *labels[i]
labels[i].orgID = orgID
l = labels[i]
})
if !l.shouldApply() {
return nil
@ -1568,6 +1678,7 @@ func (s *Service) applyLabels(ctx context.Context, labels []*label) applier {
mutex.Do(func() {
labels[i].id = influxLabel.ID
labels[i].label.id = influxLabel.ID
rollBackLabels = append(rollBackLabels, labels[i])
})
@ -1586,8 +1697,8 @@ func (s *Service) applyLabels(ctx context.Context, labels []*label) applier {
}
}
func (s *Service) rollbackLabels(ctx context.Context, labels []*label) error {
rollbackFn := func(l *label) error {
func (s *Service) rollbackLabels(ctx context.Context, labels []*stateLabel) error {
rollbackFn := func(l *stateLabel) error {
var err error
switch {
case l.shouldRemove:
@ -1617,32 +1728,30 @@ func (s *Service) rollbackLabels(ctx context.Context, labels []*label) error {
return nil
}
func (s *Service) applyLabel(ctx context.Context, l label) (influxdb.Label, error) {
if l.shouldRemove {
if err := s.labelSVC.DeleteLabel(ctx, l.ID()); err != nil {
return influxdb.Label{}, fmt.Errorf("failed to delete label[%q]: %w", l.ID(), err)
}
return *l.existing, nil
}
if l.existing != nil {
updatedlabel, err := s.labelSVC.UpdateLabel(ctx, l.ID(), influxdb.LabelUpdate{
func (s *Service) applyLabel(ctx context.Context, l *stateLabel) (influxdb.Label, error) {
var (
influxLabel *influxdb.Label
err error
)
switch {
case l.shouldRemove:
influxLabel, err = l.existing, s.labelSVC.DeleteLabel(ctx, l.ID())
case l.Exists():
influxLabel, err = s.labelSVC.UpdateLabel(ctx, l.ID(), influxdb.LabelUpdate{
Name: l.Name(),
Properties: l.properties(),
})
if err != nil {
return influxdb.Label{}, err
}
return *updatedlabel, nil
err = ierrors.Wrap(err, "updating")
default:
creatLabel := l.toInfluxLabel()
influxLabel = &creatLabel
err = ierrors.Wrap(s.labelSVC.CreateLabel(ctx, &creatLabel), "creating")
}
influxLabel := l.toInfluxLabel()
err := s.labelSVC.CreateLabel(ctx, &influxLabel)
if err != nil {
if err != nil || influxLabel == nil {
return influxdb.Label{}, err
}
return influxLabel, nil
return *influxLabel, nil
}
func (s *Service) applyNotificationEndpoints(ctx context.Context, userID influxdb.ID, endpoints []*notificationEndpoint) applier {
@ -2141,6 +2250,78 @@ func (s *Service) applyVariable(ctx context.Context, v variable) (influxdb.Varia
return influxVar, nil
}
func (s *Service) applyLabelMappingsV2(labelMappings []stateLabelMapping) applier {
const resource = "label_mapping"
mutex := new(doMutex)
rollbackMappings := make([]stateLabelMapping, 0, len(labelMappings))
createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody {
var mapping stateLabelMapping
mutex.Do(func() {
mapping = labelMappings[i]
})
ident := mapping.resource.stateIdentity()
if exists(mapping.status) || mapping.label.ID() == 0 || ident.id == 0 {
// this block here does 2 things, it does not write a
// mapping when one exists. it also avoids having to worry
// about deleting an existing mapping since it will not be
// passed to the delete function below b/c it is never added
// to the list of mappings that is referenced in the delete
// call.
return nil
}
m := influxdb.LabelMapping{
LabelID: mapping.label.ID(),
ResourceID: ident.id,
ResourceType: ident.resourceType,
}
err := s.labelSVC.CreateLabelMapping(ctx, &m)
if err != nil {
return &applyErrBody{
name: fmt.Sprintf("%s:%s:%s", ident.resourceType, ident.id, mapping.label.ID()),
msg: err.Error(),
}
}
mutex.Do(func() {
rollbackMappings = append(rollbackMappings, mapping)
})
return nil
}
return applier{
creater: creater{
entries: len(labelMappings),
fn: createFn,
},
rollbacker: rollbacker{
resource: resource,
fn: func(_ influxdb.ID) error { return s.rollbackLabelMappingsV2(rollbackMappings) },
},
}
}
func (s *Service) rollbackLabelMappingsV2(mappings []stateLabelMapping) error {
var errs []string
for _, stateMapping := range mappings {
influxMapping := stateLabelMappingToInfluxLabelMapping(stateMapping)
err := s.labelSVC.DeleteLabelMapping(context.Background(), &influxMapping)
if err != nil {
errs = append(errs, fmt.Sprintf("%s:%s", stateMapping.label.ID(), stateMapping.resource.stateIdentity().id))
}
}
if len(errs) > 0 {
return fmt.Errorf(`label_resource_id_pairs=[%s] err="unable to delete label"`, strings.Join(errs, ", "))
}
return nil
}
func (s *Service) applyLabelMappings(labelMappings []SummaryLabelMapping) applier {
const resource = "label_mapping"
@ -2152,7 +2333,8 @@ func (s *Service) applyLabelMappings(labelMappings []SummaryLabelMapping) applie
mutex.Do(func() {
mapping = labelMappings[i]
})
if mapping.exists || mapping.LabelID == 0 || mapping.ResourceID == 0 {
if exists(mapping.Status) || mapping.LabelID == 0 || mapping.ResourceID == 0 {
// this block here does 2 things, it does not write a
// mapping when one exists. it also avoids having to worry
// about deleting an existing mapping since it will not be
@ -2228,14 +2410,14 @@ func (s *Service) deleteByIDs(resource string, numIDs int, deleteFn func(context
return nil
}
func (s *Service) updateStackAfterSuccess(ctx context.Context, stackID influxdb.ID, pkg *Pkg) error {
func (s *Service) updateStackAfterSuccess(ctx context.Context, stackID influxdb.ID, pkg *Pkg, state *stateCoordinator) error {
stack, err := s.store.ReadStackByID(ctx, stackID)
if err != nil {
return err
}
var stackResources []StackResource
for _, b := range pkg.buckets() {
for _, b := range state.mBuckets {
if b.shouldRemove {
continue
}
@ -2268,7 +2450,7 @@ func (s *Service) updateStackAfterSuccess(ctx context.Context, stackID influxdb.
Name: n.PkgName(),
})
}
for _, l := range pkg.labels() {
for _, l := range state.mLabels {
if l.shouldRemove {
continue
}
@ -2296,7 +2478,7 @@ func (s *Service) updateStackAfterSuccess(ctx context.Context, stackID influxdb.
return s.store.UpdateStack(ctx, stack)
}
func (s *Service) updateStackAfterRollback(ctx context.Context, stackID influxdb.ID, pkg *Pkg) error {
func (s *Service) updateStackAfterRollback(ctx context.Context, stackID influxdb.ID, pkg *Pkg, state *stateCoordinator) error {
stack, err := s.store.ReadStackByID(ctx, stackID)
if err != nil {
return err
@ -2321,7 +2503,7 @@ func (s *Service) updateStackAfterRollback(ctx context.Context, stackID influxdb
// these are the case where a deletion happens and is rolled back creating a new resource.
// when resource is not to be removed this is a nothing burger, as it should be
// rolled back to previous state.
for _, b := range pkg.buckets() {
for _, b := range state.mBuckets {
if b.shouldRemove {
res := existingResources[newKey(KindBucket, b.PkgName())]
if res.ID != b.ID() {
@ -2348,7 +2530,7 @@ func (s *Service) updateStackAfterRollback(ctx context.Context, stackID influxdb
}
}
}
for _, l := range pkg.labels() {
for _, l := range state.mLabels {
if l.shouldRemove {
res := existingResources[newKey(KindLabel, l.PkgName())]
if res.ID != l.ID() {
@ -2375,14 +2557,6 @@ func (s *Service) updateStackAfterRollback(ctx context.Context, stackID influxdb
return s.store.UpdateStack(ctx, stack)
}
func (s *Service) findBucket(ctx context.Context, orgID influxdb.ID, b *bucket) (*influxdb.Bucket, error) {
if b.ID() != 0 {
return s.bucketSVC.FindBucketByID(ctx, b.ID())
}
return s.bucketSVC.FindBucketByName(ctx, orgID, b.Name())
}
func (s *Service) findCheck(ctx context.Context, orgID influxdb.ID, c *check) (influxdb.Check, error) {
if c.ID() != 0 {
return s.checkSVC.FindCheckByID(ctx, c.ID())
@ -2395,7 +2569,7 @@ func (s *Service) findCheck(ctx context.Context, orgID influxdb.ID, c *check) (i
})
}
func (s *Service) findLabel(ctx context.Context, orgID influxdb.ID, l *label) (*influxdb.Label, error) {
func (s *Service) findLabel(ctx context.Context, orgID influxdb.ID, l *stateLabel) (*influxdb.Label, error) {
if l.ID() != 0 {
return s.labelSVC.FindLabelByID(ctx, l.ID())
}

View File

@ -44,6 +44,6 @@ func (s *authMW) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg
return s.next.DryRun(ctx, orgID, userID, pkg, opts...)
}
func (s *authMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, error) {
func (s *authMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, error) {
return s.next.Apply(ctx, orgID, userID, pkg, opts...)
}

View File

@ -83,7 +83,7 @@ func (s *loggingMW) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *
return s.next.DryRun(ctx, orgID, userID, pkg, opts...)
}
func (s *loggingMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (sum Summary, err error) {
func (s *loggingMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (sum Summary, diff Diff, err error) {
defer func(start time.Time) {
dur := zap.Duration("took", time.Since(start))
if err != nil {

View File

@ -45,8 +45,8 @@ func (s *mwMetrics) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *
return sum, diff, rec(err)
}
func (s *mwMetrics) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, error) {
func (s *mwMetrics) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, error) {
rec := s.rec.Record("apply")
sum, err := s.next.Apply(ctx, orgID, userID, pkg, opts...)
return sum, rec(err)
sum, diff, err := s.next.Apply(ctx, orgID, userID, pkg, opts...)
return sum, diff, rec(err)
}

409
pkger/service_models.go Normal file
View File

@ -0,0 +1,409 @@
package pkger
import (
"sort"
"github.com/influxdata/influxdb/v2"
)
type stateCoordinator struct {
mBuckets map[string]*stateBucket
mLabels map[string]*stateLabel
labelMappings []stateLabelMapping
}
func newStateCoordinator(pkg *Pkg) *stateCoordinator {
state := stateCoordinator{
mBuckets: make(map[string]*stateBucket),
mLabels: make(map[string]*stateLabel),
}
for _, pkgBkt := range pkg.buckets() {
state.mBuckets[pkgBkt.PkgName()] = &stateBucket{
bucket: pkgBkt,
}
}
for _, pkgLabel := range pkg.labels() {
state.mLabels[pkgLabel.PkgName()] = &stateLabel{
label: pkgLabel,
}
}
return &state
}
func (s *stateCoordinator) buckets() []*stateBucket {
out := make([]*stateBucket, 0, len(s.mBuckets))
for _, v := range s.mBuckets {
out = append(out, v)
}
return out
}
func (s *stateCoordinator) labels() []*stateLabel {
out := make([]*stateLabel, 0, len(s.mLabels))
for _, v := range s.mLabels {
out = append(out, v)
}
return out
}
func (s *stateCoordinator) diff() Diff {
var diff Diff
for _, b := range s.mBuckets {
diff.Buckets = append(diff.Buckets, b.diffBucket())
}
sort.Slice(diff.Buckets, func(i, j int) bool {
return diff.Buckets[i].PkgName < diff.Buckets[j].PkgName
})
for _, l := range s.mLabels {
diff.Labels = append(diff.Labels, l.diffLabel())
}
sort.Slice(diff.Labels, func(i, j int) bool {
return diff.Labels[i].PkgName < diff.Labels[j].PkgName
})
for _, m := range s.labelMappings {
diff.LabelMappings = append(diff.LabelMappings, m.diffLabelMapping())
}
sort.Slice(diff.LabelMappings, func(i, j int) bool {
n, m := diff.LabelMappings[i], diff.LabelMappings[j]
if n.ResType < m.ResType {
return true
}
if n.ResType > m.ResType {
return false
}
if n.ResPkgName < m.ResPkgName {
return true
}
if n.ResPkgName > m.ResPkgName {
return false
}
return n.LabelName < m.LabelName
})
return diff
}
func (s *stateCoordinator) summary() Summary {
var sum Summary
for _, b := range s.buckets() {
if b.shouldRemove {
continue
}
sum.Buckets = append(sum.Buckets, b.summarize())
}
sort.Slice(sum.Buckets, func(i, j int) bool {
return sum.Buckets[i].PkgName < sum.Buckets[j].PkgName
})
for _, l := range s.labels() {
if l.shouldRemove {
continue
}
sum.Labels = append(sum.Labels, l.summarize())
}
sort.Slice(sum.Labels, func(i, j int) bool {
return sum.Labels[i].PkgName < sum.Labels[j].PkgName
})
for _, m := range s.labelMappings {
sum.LabelMappings = append(sum.LabelMappings, m.summarize())
}
sort.Slice(sum.LabelMappings, func(i, j int) bool {
n, m := sum.LabelMappings[i], sum.LabelMappings[j]
if n.ResourceType != m.ResourceType {
return n.ResourceType < m.ResourceType
}
if n.ResourcePkgName != m.ResourcePkgName {
return n.ResourcePkgName < m.ResourcePkgName
}
return n.LabelName < m.LabelName
})
return sum
}
func (s *stateCoordinator) getLabelByPkgName(pkgName string) *stateLabel {
return s.mLabels[pkgName]
}
func (s *stateCoordinator) Contains(k Kind, pkgName string) bool {
_, ok := s.getObjectIDSetter(k, pkgName)
return ok
}
// setObjectID sets the id for the resource graphed from the object the key identifies.
func (s *stateCoordinator) setObjectID(k Kind, pkgName string, id influxdb.ID) {
idSetFn, ok := s.getObjectIDSetter(k, pkgName)
if !ok {
return
}
idSetFn(id)
}
// setObjectID sets the id for the resource graphed from the object the key identifies.
// The pkgName and kind are used as the unique identifier, when calling this it will
// overwrite any existing value if one exists. If desired, check for the value by using
// the Contains method.
func (s *stateCoordinator) addObjectForRemoval(k Kind, pkgName string, id influxdb.ID) {
newIdentity := identity{
name: &references{val: pkgName},
}
switch k {
case KindBucket:
s.mBuckets[pkgName] = &stateBucket{
id: id,
bucket: &bucket{identity: newIdentity},
shouldRemove: true,
}
case KindLabel:
s.mLabels[pkgName] = &stateLabel{
id: id,
label: &label{identity: newIdentity},
shouldRemove: true,
}
}
}
func (s *stateCoordinator) getObjectIDSetter(k Kind, pkgName string) (func(influxdb.ID), bool) {
switch k {
case KindBucket:
r, ok := s.mBuckets[pkgName]
return func(id influxdb.ID) {
r.id = id
}, ok
case KindLabel:
r, ok := s.mLabels[pkgName]
return func(id influxdb.ID) {
r.id = id
}, ok
default:
return nil, false
}
}
type stateIdentity struct {
id influxdb.ID
name string
pkgName string
resourceType influxdb.ResourceType
shouldRemove bool
}
func (s stateIdentity) exists() bool {
return s.id != 0
}
type stateBucket struct {
id, orgID influxdb.ID
shouldRemove bool
existing *influxdb.Bucket
*bucket
}
func (b *stateBucket) diffBucket() DiffBucket {
diff := DiffBucket{
DiffIdentifier: DiffIdentifier{
ID: SafeID(b.ID()),
Remove: b.shouldRemove,
PkgName: b.PkgName(),
},
New: DiffBucketValues{
Name: b.Name(),
Description: b.Description,
RetentionRules: b.RetentionRules,
},
}
if e := b.existing; e != nil {
diff.Old = &DiffBucketValues{
Name: e.Name,
Description: e.Description,
}
if e.RetentionPeriod > 0 {
diff.Old.RetentionRules = retentionRules{newRetentionRule(e.RetentionPeriod)}
}
}
return diff
}
func (b *stateBucket) summarize() SummaryBucket {
sum := b.bucket.summarize()
sum.ID = SafeID(b.ID())
sum.OrgID = SafeID(b.orgID)
return sum
}
func (b *stateBucket) Exists() bool {
return b.existing != nil
}
func (b *stateBucket) ID() influxdb.ID {
if b.Exists() {
return b.existing.ID
}
return b.id
}
func (b *stateBucket) resourceType() influxdb.ResourceType {
return KindBucket.ResourceType()
}
func (b *stateBucket) labels() []*label {
return b.bucket.labels
}
func (b *stateBucket) stateIdentity() stateIdentity {
return stateIdentity{
id: b.ID(),
name: b.Name(),
pkgName: b.PkgName(),
resourceType: b.resourceType(),
shouldRemove: b.shouldRemove,
}
}
func (b *stateBucket) shouldApply() bool {
return b.shouldRemove ||
b.existing == nil ||
b.Description != b.existing.Description ||
b.Name() != b.existing.Name ||
b.RetentionRules.RP() != b.existing.RetentionPeriod
}
type stateLabel struct {
id, orgID influxdb.ID
shouldRemove bool
existing *influxdb.Label
*label
}
func (l *stateLabel) diffLabel() DiffLabel {
diff := DiffLabel{
DiffIdentifier: DiffIdentifier{
ID: SafeID(l.ID()),
Remove: l.shouldRemove,
PkgName: l.PkgName(),
},
New: DiffLabelValues{
Name: l.Name(),
Description: l.Description,
Color: l.Color,
},
}
if e := l.existing; e != nil {
diff.Old = &DiffLabelValues{
Name: e.Name,
Description: e.Properties["description"],
Color: e.Properties["color"],
}
}
return diff
}
func (l *stateLabel) summarize() SummaryLabel {
sum := l.label.summarize()
sum.ID = SafeID(l.ID())
sum.OrgID = SafeID(l.orgID)
return sum
}
func (l *stateLabel) Exists() bool {
return l.existing != nil
}
func (l *stateLabel) ID() influxdb.ID {
if l.Exists() {
return l.existing.ID
}
return l.id
}
func (l *stateLabel) shouldApply() bool {
return l.existing == nil ||
l.Description != l.existing.Properties["description"] ||
l.Name() != l.existing.Name ||
l.Color != l.existing.Properties["color"]
}
func (l *stateLabel) toInfluxLabel() influxdb.Label {
return influxdb.Label{
ID: l.ID(),
OrgID: l.orgID,
Name: l.Name(),
Properties: l.properties(),
}
}
func (l *stateLabel) properties() map[string]string {
return map[string]string{
"color": l.Color,
"description": l.Description,
}
}
type stateLabelMapping struct {
status StateStatus
resource interface {
stateIdentity() stateIdentity
}
label *stateLabel
}
func (lm stateLabelMapping) diffLabelMapping() DiffLabelMapping {
ident := lm.resource.stateIdentity()
return DiffLabelMapping{
StateStatus: lm.status,
ResType: ident.resourceType,
ResID: SafeID(ident.id),
ResPkgName: ident.pkgName,
ResName: ident.name,
LabelID: SafeID(lm.label.ID()),
LabelPkgName: lm.label.PkgName(),
LabelName: lm.label.Name(),
}
}
func (lm stateLabelMapping) summarize() SummaryLabelMapping {
ident := lm.resource.stateIdentity()
return SummaryLabelMapping{
Status: lm.status,
ResourceID: SafeID(ident.id),
ResourcePkgName: ident.pkgName,
ResourceName: ident.name,
ResourceType: ident.resourceType,
LabelPkgName: lm.label.PkgName(),
LabelName: lm.label.Name(),
LabelID: SafeID(lm.label.ID()),
}
}
func stateLabelMappingToInfluxLabelMapping(mapping stateLabelMapping) influxdb.LabelMapping {
ident := mapping.resource.stateIdentity()
return influxdb.LabelMapping{
LabelID: mapping.label.ID(),
ResourceID: ident.id,
ResourceType: ident.resourceType,
}
}
// IsNew identifies state status as new to the platform.
func IsNew(status StateStatus) bool {
return status == StateStatusNew
}
func exists(status StateStatus) bool {
return status == StateStatusExists
}

View File

@ -474,7 +474,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.Buckets, 2)
@ -496,29 +496,35 @@ func TestService(t *testing.T) {
testfileRunner(t, "testdata/bucket.yml", func(t *testing.T, pkg *Pkg) {
orgID := influxdb.ID(9000)
pkg.isVerified = true
stubExisting := func(name string, id influxdb.ID) {
pkgBkt := pkg.mBuckets[name]
pkgBkt.existing = &influxdb.Bucket{
// makes all pkg changes same as they are on thes existing bucket
ID: id,
OrgID: orgID,
Name: pkgBkt.Name(),
Description: pkgBkt.Description,
RetentionPeriod: pkgBkt.RetentionRules.RP(),
}
}
stubExisting("rucket_11", 3)
stubExisting("rucket_22", 4)
fakeBktSVC := mock.NewBucketService()
fakeBktSVC.FindBucketByNameFn = func(ctx context.Context, oid influxdb.ID, name string) (*influxdb.Bucket, error) {
if orgID != oid {
return nil, errors.New("invalid org id")
}
id := influxdb.ID(3)
if name == "display name" {
id = 4
name = "rucket_22"
}
if bkt, ok := pkg.mBuckets[name]; ok {
return &influxdb.Bucket{
ID: id,
OrgID: oid,
Name: bkt.Name(),
Description: bkt.Description,
RetentionPeriod: bkt.RetentionRules.RP(),
}, nil
}
return nil, errors.New("not found")
}
fakeBktSVC.UpdateBucketFn = func(_ context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) {
return &influxdb.Bucket{ID: id}, nil
}
svc := newTestService(WithBucketSVC(fakeBktSVC))
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.Buckets, 2)
@ -559,7 +565,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.GreaterOrEqual(t, fakeBktSVC.DeleteBucketCalls.Count(), 1)
@ -580,7 +586,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.Checks, 2)
@ -626,7 +632,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.GreaterOrEqual(t, fakeCheckSVC.DeleteCheckCalls.Count(), 1)
@ -651,7 +657,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.Labels, 3)
@ -704,7 +710,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.GreaterOrEqual(t, fakeLabelSVC.DeleteLabelCalls.Count(), 1)
@ -712,13 +718,12 @@ func TestService(t *testing.T) {
})
t.Run("will not apply label if no changes to be applied", func(t *testing.T) {
testfileRunner(t, "testdata/label", func(t *testing.T, pkg *Pkg) {
testfileRunner(t, "testdata/label.yml", func(t *testing.T, pkg *Pkg) {
orgID := influxdb.ID(9000)
pkg.isVerified = true
stubExisting := func(name string, id influxdb.ID) {
stubExisting := func(name string, id influxdb.ID) *influxdb.Label {
pkgLabel := pkg.mLabels[name]
pkgLabel.existing = &influxdb.Label{
return &influxdb.Label{
// makes all pkg changes same as they are on the existing
ID: id,
OrgID: orgID,
@ -733,6 +738,18 @@ func TestService(t *testing.T) {
stubExisting("label_3", 3)
fakeLabelSVC := mock.NewLabelService()
fakeLabelSVC.FindLabelsFn = func(ctx context.Context, f influxdb.LabelFilter) ([]*influxdb.Label, error) {
if f.Name != "label_1" && f.Name != "display name" {
return nil, nil
}
id := influxdb.ID(1)
name := f.Name
if f.Name == "display name" {
id = 3
name = "label_3"
}
return []*influxdb.Label{stubExisting(name, id)}, nil
}
fakeLabelSVC.CreateLabelFn = func(_ context.Context, l *influxdb.Label) error {
if l.Name == "label_2" {
l.ID = 2
@ -748,7 +765,7 @@ func TestService(t *testing.T) {
svc := newTestService(WithLabelSVC(fakeLabelSVC))
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.Labels, 3)
@ -802,7 +819,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.Dashboards, 1)
@ -837,7 +854,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.True(t, deletedDashs[1])
@ -846,9 +863,83 @@ func TestService(t *testing.T) {
})
t.Run("label mapping", func(t *testing.T) {
testLabelMappingV2ApplyFn := func(t *testing.T, filename string, numExpected int, settersFn func() []ServiceSetterFn) {
testfileRunner(t, filename, func(t *testing.T, pkg *Pkg) {
t.Helper()
fakeLabelSVC := mock.NewLabelService()
fakeLabelSVC.CreateLabelFn = func(_ context.Context, l *influxdb.Label) error {
l.ID = influxdb.ID(rand.Int())
return nil
}
fakeLabelSVC.CreateLabelMappingFn = func(_ context.Context, mapping *influxdb.LabelMapping) error {
if mapping.ResourceID == 0 {
return errors.New("did not get a resource ID")
}
if mapping.ResourceType == "" {
return errors.New("did not get a resource type")
}
return nil
}
svc := newTestService(append(settersFn(),
WithLabelSVC(fakeLabelSVC),
WithLogger(zaptest.NewLogger(t)),
)...)
orgID := influxdb.ID(9000)
_, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
assert.Equal(t, numExpected, fakeLabelSVC.CreateLabelMappingCalls.Count())
})
}
testLabelMappingV2RollbackFn := func(t *testing.T, filename string, killCount int, settersFn func() []ServiceSetterFn) {
testfileRunner(t, filename, func(t *testing.T, pkg *Pkg) {
t.Helper()
fakeLabelSVC := mock.NewLabelService()
fakeLabelSVC.CreateLabelFn = func(_ context.Context, l *influxdb.Label) error {
l.ID = influxdb.ID(fakeLabelSVC.CreateLabelCalls.Count() + 1)
return nil
}
fakeLabelSVC.DeleteLabelMappingFn = func(_ context.Context, m *influxdb.LabelMapping) error {
t.Logf("delete: %+v", m)
return nil
}
fakeLabelSVC.CreateLabelMappingFn = func(_ context.Context, mapping *influxdb.LabelMapping) error {
t.Logf("create: %+v", mapping)
if mapping.ResourceID == 0 {
return errors.New("did not get a resource ID")
}
if mapping.ResourceType == "" {
return errors.New("did not get a resource type")
}
if fakeLabelSVC.CreateLabelMappingCalls.Count() == killCount {
return errors.New("hit last label")
}
return nil
}
svc := newTestService(append(settersFn(),
WithLabelSVC(fakeLabelSVC),
WithLogger(zaptest.NewLogger(t)),
)...)
orgID := influxdb.ID(9000)
_, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.GreaterOrEqual(t, fakeLabelSVC.DeleteLabelMappingCalls.Count(), killCount)
})
}
testLabelMappingFn := func(t *testing.T, filename string, numExpected int, settersFn func() []ServiceSetterFn) {
t.Run("applies successfully", func(t *testing.T) {
testfileRunner(t, filename, func(t *testing.T, pkg *Pkg) {
t.Helper()
fakeLabelSVC := mock.NewLabelService()
fakeLabelSVC.CreateLabelFn = func(_ context.Context, l *influxdb.Label) error {
l.ID = influxdb.ID(rand.Int())
@ -870,7 +961,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
assert.Equal(t, numExpected, fakeLabelSVC.CreateLabelMappingCalls.Count())
@ -879,6 +970,8 @@ func TestService(t *testing.T) {
t.Run("deletes new label mappings on error", func(t *testing.T) {
testfileRunner(t, filename, func(t *testing.T, pkg *Pkg) {
t.Helper()
for _, l := range pkg.mLabels {
for resource, vals := range l.mappings {
// create extra label mappings, enough for delete to ahve head room
@ -893,7 +986,12 @@ func TestService(t *testing.T) {
l.ID = influxdb.ID(fakeLabelSVC.CreateLabelCalls.Count() + 1)
return nil
}
fakeLabelSVC.DeleteLabelMappingFn = func(_ context.Context, m *influxdb.LabelMapping) error {
t.Logf("delete: %+v", m)
return nil
}
fakeLabelSVC.CreateLabelMappingFn = func(_ context.Context, mapping *influxdb.LabelMapping) error {
t.Logf("create: %+v", mapping)
if mapping.ResourceID == 0 {
return errors.New("did not get a resource ID")
}
@ -912,7 +1010,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.GreaterOrEqual(t, fakeLabelSVC.DeleteLabelMappingCalls.Count(), numExpected)
@ -921,23 +1019,26 @@ func TestService(t *testing.T) {
}
t.Run("maps buckets with labels", func(t *testing.T) {
testLabelMappingFn(
t,
"testdata/bucket_associates_label.yml",
4,
func() []ServiceSetterFn {
fakeBktSVC := mock.NewBucketService()
fakeBktSVC.CreateBucketFn = func(_ context.Context, b *influxdb.Bucket) error {
b.ID = influxdb.ID(rand.Int())
return nil
}
fakeBktSVC.FindBucketByNameFn = func(_ context.Context, id influxdb.ID, s string) (*influxdb.Bucket, error) {
// forces the bucket to be created a new
return nil, errors.New("an error")
}
return []ServiceSetterFn{WithBucketSVC(fakeBktSVC)}
},
)
bktOpt := func() []ServiceSetterFn {
fakeBktSVC := mock.NewBucketService()
fakeBktSVC.CreateBucketFn = func(_ context.Context, b *influxdb.Bucket) error {
b.ID = influxdb.ID(rand.Int())
return nil
}
fakeBktSVC.FindBucketByNameFn = func(_ context.Context, id influxdb.ID, s string) (*influxdb.Bucket, error) {
// forces the bucket to be created a new
return nil, errors.New("an error")
}
return []ServiceSetterFn{WithBucketSVC(fakeBktSVC)}
}
t.Run("applies successfully", func(t *testing.T) {
testLabelMappingV2ApplyFn(t, "testdata/bucket_associates_label.yml", 4, bktOpt)
})
t.Run("deletes new label mappings on error", func(t *testing.T) {
testLabelMappingV2RollbackFn(t, "testdata/bucket_associates_label.yml", 2, bktOpt)
})
})
t.Run("maps checks with labels", func(t *testing.T) {
@ -1100,7 +1201,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.NotificationEndpoints, 5)
@ -1153,7 +1254,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.GreaterOrEqual(t, fakeEndpointSVC.DeleteNotificationEndpointCalls.Count(), 5)
@ -1189,7 +1290,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.NotificationRules, 1)
@ -1240,7 +1341,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.Equal(t, 1, fakeRuleStore.DeleteNotificationRuleCalls.Count())
@ -1274,7 +1375,7 @@ func TestService(t *testing.T) {
svc := newTestService(WithTaskSVC(fakeTaskSVC))
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.Tasks, 2)
@ -1302,7 +1403,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.Equal(t, 1, fakeTaskSVC.DeleteTaskCalls.Count())
@ -1323,7 +1424,7 @@ func TestService(t *testing.T) {
svc := newTestService(WithTelegrafSVC(fakeTeleSVC))
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.TelegrafConfigs, 1)
@ -1355,7 +1456,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.Equal(t, 1, fakeTeleSVC.DeleteTelegrafConfigCalls.Count())
@ -1376,7 +1477,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.Variables, 4)
@ -1409,7 +1510,7 @@ func TestService(t *testing.T) {
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, 0, pkg)
_, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.Error(t, err)
assert.GreaterOrEqual(t, fakeVarSVC.DeleteVariableCalls.Count(), 1)
@ -1420,20 +1521,21 @@ func TestService(t *testing.T) {
testfileRunner(t, "testdata/variables.yml", func(t *testing.T, pkg *Pkg) {
orgID := influxdb.ID(9000)
pkg.isVerified = true
pkgLabel := pkg.mVariables["var_const_3"]
pkgLabel.existing = &influxdb.Variable{
// makes all pkg changes same as they are on the existing
ID: influxdb.ID(1),
OrganizationID: orgID,
Name: pkgLabel.Name(),
Arguments: &influxdb.VariableArguments{
Type: "constant",
Values: influxdb.VariableConstantValues{"first val"},
},
}
fakeVarSVC := mock.NewVariableService()
fakeVarSVC.FindVariablesF = func(ctx context.Context, f influxdb.VariableFilter, _ ...influxdb.FindOptions) ([]*influxdb.Variable, error) {
return []*influxdb.Variable{
{
// makes all pkg changes same as they are on the existing
ID: influxdb.ID(1),
OrganizationID: orgID,
Name: pkg.mVariables["var_const_3"].Name(),
Arguments: &influxdb.VariableArguments{
Type: "constant",
Values: influxdb.VariableConstantValues{"first val"},
},
},
}, nil
}
fakeVarSVC.CreateVariableF = func(_ context.Context, l *influxdb.Variable) error {
if l.Name == "var_const" {
return errors.New("shouldn't get here")
@ -1449,7 +1551,7 @@ func TestService(t *testing.T) {
svc := newTestService(WithVariableSVC(fakeVarSVC))
sum, err := svc.Apply(context.TODO(), orgID, 0, pkg)
sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg)
require.NoError(t, err)
require.Len(t, sum.Variables, 4)

View File

@ -39,7 +39,7 @@ func (s *traceMW) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pk
return s.next.DryRun(ctx, orgID, userID, pkg, opts...)
}
func (s *traceMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (sum Summary, err error) {
func (s *traceMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (sum Summary, diff Diff, err error) {
span, ctx := tracing.StartSpanFromContextWithOperationName(ctx, "Apply")
span.LogKV("orgID", orgID.String(), "userID", userID.String())
defer span.Finish()

View File

@ -153,7 +153,7 @@ func (s *Service) UpdateOrganization(ctx context.Context, id influxdb.ID, upd in
return org, nil
}
// Removes a organization by ID.
// DeleteOrganization removes a organization by ID and its dependent resources.
func (s *Service) DeleteOrganization(ctx context.Context, id influxdb.ID) error {
err := s.store.Update(ctx, func(tx kv.Tx) error {
// clean up the buckets for this organization