diff --git a/cmd/influx/pkg.go b/cmd/influx/pkg.go index bfeb176a92..ece0f06876 100644 --- a/cmd/influx/pkg.go +++ b/cmd/influx/pkg.go @@ -187,7 +187,7 @@ func (b *cmdPkgBuilder) pkgApplyRunEFn(cmd *cobra.Command, args []string) error opts = append(opts, pkger.ApplyWithSecrets(providedSecrets)) - summary, err := svc.Apply(context.Background(), influxOrgID, 0, pkg, opts...) + summary, _, err := svc.Apply(context.Background(), influxOrgID, 0, pkg, opts...) if err != nil { return err } @@ -973,18 +973,18 @@ func (b *cmdPkgBuilder) printPkgDiff(diff pkger.Diff) error { Title("Label Associations"). SetHeaders( "Resource Type", - "Resource Name", "Resource ID", - "Label Name", "Label ID", + "Resource Package Name", "Resource Name", "Resource ID", + "Label Package Name", "Label Name", "Label ID", ) for _, m := range diff.LabelMappings { newRow := []string{ string(m.ResType), - m.ResName, m.ResID.String(), - m.LabelName, m.LabelID.String(), + m.ResPkgName, m.ResName, m.ResID.String(), + m.LabelPkgName, m.LabelName, m.LabelID.String(), } oldRow := newRow - if m.IsNew { + if pkger.IsNew(m.StateStatus) { oldRow = nil } printer.AppendDiff(oldRow, newRow) diff --git a/cmd/influx/pkg_test.go b/cmd/influx/pkg_test.go index cd281f12b2..483ed33f81 100644 --- a/cmd/influx/pkg_test.go +++ b/cmd/influx/pkg_test.go @@ -705,7 +705,7 @@ type fakePkgSVC struct { initStackFn func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) createFn func(ctx context.Context, setters ...pkger.CreatePkgSetFn) (*pkger.Pkg, error) dryRunFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg) (pkger.Summary, pkger.Diff, error) - applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, error) + applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) } func (f *fakePkgSVC) InitStack(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) { @@ -729,7 +729,7 @@ func (f *fakePkgSVC) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg panic("not implemented") } -func (f *fakePkgSVC) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, error) { +func (f *fakePkgSVC) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) { if f.applyFn != nil { return f.applyFn(ctx, orgID, userID, pkg, opts...) } diff --git a/cmd/influxd/launcher/pkger_test.go b/cmd/influxd/launcher/pkger_test.go index c1e0d6fd24..13678622d9 100644 --- a/cmd/influxd/launcher/pkger_test.go +++ b/cmd/influxd/launcher/pkger_test.go @@ -24,6 +24,7 @@ func TestLauncher_Pkger(t *testing.T) { l := RunTestLauncherOrFail(t, ctx) l.SetupOrFail(t) defer l.ShutdownOrFail(t, ctx) + require.NoError(t, l.BucketService(t).DeleteBucket(ctx, l.Bucket.ID)) svc := l.PkgerService(t) @@ -151,9 +152,8 @@ func TestLauncher_Pkger(t *testing.T) { ) var initialSum pkger.Summary - t.Log("apply pkg with stack id") - { - sum, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, initialPkg, applyOpt) + t.Run("apply pkg with stack id", func(t *testing.T) { + sum, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, initialPkg, applyOpt) require.NoError(t, err) initialSum = sum @@ -198,7 +198,7 @@ func TestLauncher_Pkger(t *testing.T) { actualVar := resourceCheck.mustGetVariable(t, byName("var char")) assert.Equal(t, sum.Variables[0].ID, pkger.SafeID(actualVar.ID)) } - } + }) var ( updateBucketName = "new bucket" @@ -207,8 +207,7 @@ func TestLauncher_Pkger(t *testing.T) { updateLabelName = "new label" updateVariableName = "new variable" ) - t.Log("apply pkg with stack id where resources change") - { + t.Run("apply pkg with stack id where resources change", func(t *testing.T) { updatedPkg := newPkg( newBucketObject(initialBucketPkgName, updateBucketName, ""), newCheckDeadmanObject(t, initialCheckPkgName, updateCheckName, time.Hour), @@ -216,7 +215,7 @@ func TestLauncher_Pkger(t *testing.T) { newLabelObject(initialLabelPkgName, updateLabelName, "", ""), newVariableObject(initialVariablePkgName, updateVariableName, ""), ) - sum, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, updatedPkg, applyOpt) + sum, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, updatedPkg, applyOpt) require.NoError(t, err) require.Len(t, sum.Buckets, 1) @@ -257,10 +256,9 @@ func TestLauncher_Pkger(t *testing.T) { actualVar := resourceCheck.mustGetVariable(t, byName(updateVariableName)) assert.Equal(t, sum.Variables[0].ID, pkger.SafeID(actualVar.ID)) } - } + }) - t.Log("an error during application roles back resources to previous state") - { + t.Run("an error during application roles back resources to previous state", func(t *testing.T) { logger := l.log.With(zap.String("service", "pkger")) var svc pkger.SVC = pkger.NewService( pkger.WithLogger(logger), @@ -288,9 +286,27 @@ func TestLauncher_Pkger(t *testing.T) { newEndpointHTTP("z_endpoint_rolls_back", "", ""), newVariableObject("z_var_rolls_back", "", ""), ) - _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, pkgWithDelete, applyOpt) + _, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, pkgWithDelete, applyOpt) require.Error(t, err) + t.Log("\tvalidate all resources are rolled back") + { + actualBkt := resourceCheck.mustGetBucket(t, byName(updateBucketName)) + assert.NotEqual(t, initialSum.Buckets[0].ID, pkger.SafeID(actualBkt.ID)) + + actualCheck := resourceCheck.mustGetCheck(t, byName(updateCheckName)) + assert.NotEqual(t, initialSum.Checks[0].Check.GetID(), actualCheck.GetID()) + + actualEndpoint := resourceCheck.mustGetEndpoint(t, byName(updateEndpointName)) + assert.NotEqual(t, initialSum.NotificationEndpoints[0].NotificationEndpoint.GetID(), actualEndpoint.GetID()) + + actualLabel := resourceCheck.mustGetLabel(t, byName(updateLabelName)) + assert.NotEqual(t, initialSum.Labels[0].ID, pkger.SafeID(actualLabel.ID)) + + actualVariable := resourceCheck.mustGetVariable(t, byName(updateVariableName)) + assert.NotEqual(t, initialSum.Variables[0].ID, pkger.SafeID(actualVariable.ID)) + } + t.Log("\tvalidate all changes do not persist") { for _, name := range []string{"z_roll_me_back", "z_rolls_back_too"} { @@ -310,28 +326,9 @@ func TestLauncher_Pkger(t *testing.T) { _, err = resourceCheck.getVariable(t, byName("z_var_rolls_back")) require.Error(t, err) } + }) - t.Log("\tvalidate all resources are rolled back") - { - actualBkt := resourceCheck.mustGetBucket(t, byName(updateBucketName)) - assert.NotEqual(t, initialSum.Buckets[0].ID, pkger.SafeID(actualBkt.ID)) - - actualCheck := resourceCheck.mustGetCheck(t, byName(updateCheckName)) - assert.NotEqual(t, initialSum.Checks[0].Check.GetID(), actualCheck.GetID()) - - actualEndpoint := resourceCheck.mustGetEndpoint(t, byName(updateEndpointName)) - assert.NotEqual(t, initialSum.NotificationEndpoints[0].NotificationEndpoint.GetID(), actualEndpoint.GetID()) - - actualLabel := resourceCheck.mustGetLabel(t, byName(updateLabelName)) - assert.NotEqual(t, initialSum.Labels[0].ID, pkger.SafeID(actualLabel.ID)) - - actualVariable := resourceCheck.mustGetVariable(t, byName(updateVariableName)) - assert.NotEqual(t, initialSum.Variables[0].ID, pkger.SafeID(actualVariable.ID)) - } - } - - t.Log("apply pkg with stack id where resources have been removed since last run") - { + t.Run("apply pkg with stack id where resources have been removed since last run", func(t *testing.T) { allNewResourcesPkg := newPkg( newBucketObject("non_existent_bucket", "", ""), newCheckDeadmanObject(t, "non_existent_check", "", time.Minute), @@ -339,7 +336,7 @@ func TestLauncher_Pkger(t *testing.T) { newLabelObject("non_existent_label", "", "", ""), newVariableObject("non_existent_var", "", ""), ) - sum, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, allNewResourcesPkg, applyOpt) + sum, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, allNewResourcesPkg, applyOpt) require.NoError(t, err) require.Len(t, sum.Buckets, 1) @@ -413,7 +410,7 @@ func TestLauncher_Pkger(t *testing.T) { _, err = resourceCheck.getVariable(t, byName(updateVariableName)) require.Error(t, err) } - } + }) }) }) @@ -433,7 +430,7 @@ func TestLauncher_Pkger(t *testing.T) { pkger.WithVariableSVC(l.VariableService(t)), ) - _, err := svc.Apply(ctx, l.Org.ID, l.User.ID, newPkg(t)) + _, _, err := svc.Apply(ctx, l.Org.ID, l.User.ID, newPkg(t)) require.Error(t, err) bkts, _, err := l.BucketService(t).FindBuckets(ctx, influxdb.BucketFilter{OrganizationID: &l.Org.ID}) @@ -503,17 +500,6 @@ func TestLauncher_Pkger(t *testing.T) { } } - hasMapping := func(t *testing.T, actuals []pkger.SummaryLabelMapping, expected pkger.SummaryLabelMapping) { - t.Helper() - - for _, actual := range actuals { - if actual == expected { - return - } - } - require.FailNowf(t, "did not find expected mapping", "expected: %v", expected) - } - t.Run("dry run a package with no existing resources", func(t *testing.T) { sum, diff, err := svc.DryRun(ctx, l.Org.ID, l.User.ID, newPkg(t)) require.NoError(t, err) @@ -639,137 +625,114 @@ spec: t.Run("apply a package of all new resources", func(t *testing.T) { // this initial test is also setup for the sub tests - sum1, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, newPkg(t)) + sum1, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, newPkg(t)) require.NoError(t, err) - verifyCompleteSummary := func(t *testing.T, sum1 pkger.Summary, exportAllSum bool) { - t.Helper() + labels := sum1.Labels + require.Len(t, labels, 2) + assert.NotZero(t, labels[0].ID) + assert.Equal(t, "label_1", labels[0].Name) + assert.Equal(t, "the 2nd label", labels[1].Name) - labels := sum1.Labels - require.Len(t, labels, 2) - if !exportAllSum { - assert.NotZero(t, labels[0].ID) - } - assert.Equal(t, "label_1", labels[0].Name) - assert.Equal(t, "the 2nd label", labels[1].Name) + bkts := sum1.Buckets + require.Len(t, bkts, 1) + assert.NotZero(t, bkts[0].ID) + assert.NotEmpty(t, bkts[0].PkgName) + assert.Equal(t, "rucketeer", bkts[0].Name) + hasLabelAssociations(t, bkts[0].LabelAssociations, 2, "label_1", "the 2nd label") - bkts := sum1.Buckets - if exportAllSum { - require.Len(t, bkts, 2) - assert.Equal(t, l.Bucket.Name, bkts[0].Name) - bkts = bkts[1:] - } - require.Len(t, bkts, 1) - if !exportAllSum { - assert.NotZero(t, bkts[0].ID) - } - assert.Equal(t, "rucketeer", bkts[0].Name) - hasLabelAssociations(t, bkts[0].LabelAssociations, 2, "label_1", "the 2nd label") - - checks := sum1.Checks - require.Len(t, checks, 2) - assert.Equal(t, "check 0 name", checks[0].Check.GetName()) - hasLabelAssociations(t, checks[0].LabelAssociations, 1, "label_1") - assert.Equal(t, "check_1", checks[1].Check.GetName()) - hasLabelAssociations(t, checks[1].LabelAssociations, 1, "label_1") - for _, ch := range checks { - if !exportAllSum { - assert.NotZero(t, ch.Check.GetID()) - } - } - - dashs := sum1.Dashboards - require.Len(t, dashs, 1) - if !exportAllSum { - assert.NotZero(t, dashs[0].ID) - } - assert.Equal(t, "dash_1", dashs[0].Name) - assert.Equal(t, "desc1", dashs[0].Description) - hasLabelAssociations(t, dashs[0].LabelAssociations, 2, "label_1", "the 2nd label") - require.Len(t, dashs[0].Charts, 1) - assert.Equal(t, influxdb.ViewPropertyTypeSingleStat, dashs[0].Charts[0].Properties.GetType()) - - endpoints := sum1.NotificationEndpoints - require.Len(t, endpoints, 1) - if !exportAllSum { - assert.NotZero(t, endpoints[0].NotificationEndpoint.GetID()) - } - assert.Equal(t, "no auth endpoint", endpoints[0].NotificationEndpoint.GetName()) - assert.Equal(t, "http none auth desc", endpoints[0].NotificationEndpoint.GetDescription()) - assert.Equal(t, influxdb.TaskStatusInactive, string(endpoints[0].NotificationEndpoint.GetStatus())) - hasLabelAssociations(t, endpoints[0].LabelAssociations, 1, "label_1") - - require.Len(t, sum1.NotificationRules, 1) - rule := sum1.NotificationRules[0] - if !exportAllSum { - assert.NotZero(t, rule.ID) - } - assert.Equal(t, "rule_0", rule.Name) - assert.Equal(t, pkger.SafeID(endpoints[0].NotificationEndpoint.GetID()), rule.EndpointID) - if !exportAllSum { - assert.Equal(t, "http_none_auth_notification_endpoint", rule.EndpointName) - assert.Equalf(t, "http", rule.EndpointType, "rule: %+v", rule) - } else { - assert.NotEmpty(t, rule.EndpointName) - } - - require.Len(t, sum1.Tasks, 1) - task := sum1.Tasks[0] - if !exportAllSum { - assert.NotZero(t, task.ID) - } - assert.Equal(t, "task_1", task.Name) - assert.Equal(t, "desc_1", task.Description) - - teles := sum1.TelegrafConfigs - require.Len(t, teles, 1) - if !exportAllSum { - assert.NotZero(t, teles[0].TelegrafConfig.ID) - assert.Equal(t, l.Org.ID, teles[0].TelegrafConfig.OrgID) - } - assert.Equal(t, "first tele config", teles[0].TelegrafConfig.Name) - assert.Equal(t, "desc", teles[0].TelegrafConfig.Description) - assert.Equal(t, telConf, teles[0].TelegrafConfig.Config) - - vars := sum1.Variables - require.Len(t, vars, 1) - if !exportAllSum { - assert.NotZero(t, vars[0].ID) - } - assert.Equal(t, "query var", vars[0].Name) - hasLabelAssociations(t, vars[0].LabelAssociations, 1, "label_1") - varArgs := vars[0].Arguments - require.NotNil(t, varArgs) - assert.Equal(t, "query", varArgs.Type) - assert.Equal(t, influxdb.VariableQueryValues{ - Query: "buckets() |> filter(fn: (r) => r.name !~ /^_/) |> rename(columns: {name: \"_value\"}) |> keep(columns: [\"_value\"])", - Language: "flux", - }, varArgs.Values) - - newSumMapping := func(id pkger.SafeID, name string, rt influxdb.ResourceType) pkger.SummaryLabelMapping { - return pkger.SummaryLabelMapping{ - ResourceName: name, - LabelName: labels[0].Name, - LabelID: labels[0].ID, - ResourceID: id, - ResourceType: rt, - } - } - - mappings := sum1.LabelMappings - require.Len(t, mappings, 11) - hasMapping(t, mappings, newSumMapping(bkts[0].ID, bkts[0].Name, influxdb.BucketsResourceType)) - hasMapping(t, mappings, newSumMapping(pkger.SafeID(checks[0].Check.GetID()), checks[0].Check.GetName(), influxdb.ChecksResourceType)) - hasMapping(t, mappings, newSumMapping(pkger.SafeID(checks[1].Check.GetID()), checks[1].Check.GetName(), influxdb.ChecksResourceType)) - hasMapping(t, mappings, newSumMapping(dashs[0].ID, dashs[0].Name, influxdb.DashboardsResourceType)) - hasMapping(t, mappings, newSumMapping(pkger.SafeID(endpoints[0].NotificationEndpoint.GetID()), endpoints[0].NotificationEndpoint.GetName(), influxdb.NotificationEndpointResourceType)) - hasMapping(t, mappings, newSumMapping(rule.ID, rule.Name, influxdb.NotificationRuleResourceType)) - hasMapping(t, mappings, newSumMapping(task.ID, task.Name, influxdb.TasksResourceType)) - hasMapping(t, mappings, newSumMapping(pkger.SafeID(teles[0].TelegrafConfig.ID), teles[0].TelegrafConfig.Name, influxdb.TelegrafsResourceType)) - hasMapping(t, mappings, newSumMapping(vars[0].ID, vars[0].Name, influxdb.VariablesResourceType)) + checks := sum1.Checks + require.Len(t, checks, 2) + assert.Equal(t, "check 0 name", checks[0].Check.GetName()) + hasLabelAssociations(t, checks[0].LabelAssociations, 1, "label_1") + assert.Equal(t, "check_1", checks[1].Check.GetName()) + hasLabelAssociations(t, checks[1].LabelAssociations, 1, "label_1") + for _, ch := range checks { + assert.NotZero(t, ch.Check.GetID()) } - verifyCompleteSummary(t, sum1, false) + dashs := sum1.Dashboards + require.Len(t, dashs, 1) + assert.NotZero(t, dashs[0].ID) + assert.NotEmpty(t, dashs[0].Name) + assert.Equal(t, "dash_1", dashs[0].Name) + assert.Equal(t, "desc1", dashs[0].Description) + hasLabelAssociations(t, dashs[0].LabelAssociations, 2, "label_1", "the 2nd label") + require.Len(t, dashs[0].Charts, 1) + assert.Equal(t, influxdb.ViewPropertyTypeSingleStat, dashs[0].Charts[0].Properties.GetType()) + + endpoints := sum1.NotificationEndpoints + require.Len(t, endpoints, 1) + assert.NotZero(t, endpoints[0].NotificationEndpoint.GetID()) + assert.Equal(t, "no auth endpoint", endpoints[0].NotificationEndpoint.GetName()) + assert.Equal(t, "http none auth desc", endpoints[0].NotificationEndpoint.GetDescription()) + assert.Equal(t, influxdb.TaskStatusInactive, string(endpoints[0].NotificationEndpoint.GetStatus())) + hasLabelAssociations(t, endpoints[0].LabelAssociations, 1, "label_1") + + require.Len(t, sum1.NotificationRules, 1) + rule := sum1.NotificationRules[0] + assert.NotZero(t, rule.ID) + assert.Equal(t, "rule_0", rule.Name) + assert.Equal(t, pkger.SafeID(endpoints[0].NotificationEndpoint.GetID()), rule.EndpointID) + assert.Equal(t, "http_none_auth_notification_endpoint", rule.EndpointName) + assert.Equalf(t, "http", rule.EndpointType, "rule: %+v", rule) + + require.Len(t, sum1.Tasks, 1) + task := sum1.Tasks[0] + assert.NotZero(t, task.ID) + assert.Equal(t, "task_1", task.Name) + assert.Equal(t, "desc_1", task.Description) + + teles := sum1.TelegrafConfigs + require.Len(t, teles, 1) + assert.NotZero(t, teles[0].TelegrafConfig.ID) + assert.Equal(t, l.Org.ID, teles[0].TelegrafConfig.OrgID) + assert.Equal(t, "first tele config", teles[0].TelegrafConfig.Name) + assert.Equal(t, "desc", teles[0].TelegrafConfig.Description) + assert.Equal(t, telConf, teles[0].TelegrafConfig.Config) + + vars := sum1.Variables + require.Len(t, vars, 1) + assert.NotZero(t, vars[0].ID) + assert.Equal(t, "query var", vars[0].Name) + hasLabelAssociations(t, vars[0].LabelAssociations, 1, "label_1") + varArgs := vars[0].Arguments + require.NotNil(t, varArgs) + assert.Equal(t, "query", varArgs.Type) + assert.Equal(t, influxdb.VariableQueryValues{ + Query: "buckets() |> filter(fn: (r) => r.name !~ /^_/) |> rename(columns: {name: \"_value\"}) |> keep(columns: [\"_value\"])", + Language: "flux", + }, varArgs.Values) + + newSumMapping := func(id pkger.SafeID, pkgName, name string, rt influxdb.ResourceType) pkger.SummaryLabelMapping { + return pkger.SummaryLabelMapping{ + Status: pkger.StateStatusNew, + ResourceID: id, + ResourceType: rt, + ResourcePkgName: pkgName, + ResourceName: name, + LabelPkgName: labels[0].PkgName, + LabelName: labels[0].Name, + LabelID: labels[0].ID, + } + } + + mappings := sum1.LabelMappings + + mappingsContain := func(t *testing.T, id pkger.SafeID, pkgName, name string, resourceType influxdb.ResourceType) { + t.Helper() + assert.Contains(t, mappings, newSumMapping(id, pkgName, name, resourceType)) + } + + require.Len(t, mappings, 11) + mappingsContain(t, bkts[0].ID, bkts[0].PkgName, bkts[0].Name, influxdb.BucketsResourceType) + mappingsContain(t, pkger.SafeID(checks[0].Check.GetID()), checks[0].PkgName, checks[0].Check.GetName(), influxdb.ChecksResourceType) + mappingsContain(t, dashs[0].ID, dashs[0].PkgName, dashs[0].Name, influxdb.DashboardsResourceType) + mappingsContain(t, pkger.SafeID(endpoints[0].NotificationEndpoint.GetID()), endpoints[0].PkgName, endpoints[0].NotificationEndpoint.GetName(), influxdb.NotificationEndpointResourceType) + mappingsContain(t, rule.ID, rule.PkgName, rule.Name, influxdb.NotificationRuleResourceType) + mappingsContain(t, task.ID, task.PkgName, task.Name, influxdb.TasksResourceType) + mappingsContain(t, pkger.SafeID(teles[0].TelegrafConfig.ID), teles[0].PkgName, teles[0].TelegrafConfig.Name, influxdb.TelegrafsResourceType) + mappingsContain(t, vars[0].ID, vars[0].PkgName, vars[0].Name, influxdb.VariablesResourceType) var ( // used in dependent subtests @@ -793,7 +756,95 @@ spec: )) require.NoError(t, err) - verifyCompleteSummary(t, newPkg.Summary(), true) + sum := newPkg.Summary() + + labels := sum.Labels + require.Len(t, labels, 2) + assert.Equal(t, "label_1", labels[0].Name) + assert.Equal(t, "the 2nd label", labels[1].Name) + + bkts := sum.Buckets + require.Len(t, bkts, 1) + assert.NotEmpty(t, bkts[0].PkgName) + assert.Equal(t, "rucketeer", bkts[0].Name) + hasLabelAssociations(t, bkts[0].LabelAssociations, 2, "label_1", "the 2nd label") + + checks := sum.Checks + require.Len(t, checks, 2) + assert.Equal(t, "check 0 name", checks[0].Check.GetName()) + hasLabelAssociations(t, checks[0].LabelAssociations, 1, "label_1") + assert.Equal(t, "check_1", checks[1].Check.GetName()) + hasLabelAssociations(t, checks[1].LabelAssociations, 1, "label_1") + + dashs := sum.Dashboards + require.Len(t, dashs, 1) + assert.NotEmpty(t, dashs[0].Name) + assert.Equal(t, "dash_1", dashs[0].Name) + assert.Equal(t, "desc1", dashs[0].Description) + hasLabelAssociations(t, dashs[0].LabelAssociations, 2, "label_1", "the 2nd label") + require.Len(t, dashs[0].Charts, 1) + assert.Equal(t, influxdb.ViewPropertyTypeSingleStat, dashs[0].Charts[0].Properties.GetType()) + + endpoints := sum.NotificationEndpoints + require.Len(t, endpoints, 1) + assert.Equal(t, "no auth endpoint", endpoints[0].NotificationEndpoint.GetName()) + assert.Equal(t, "http none auth desc", endpoints[0].NotificationEndpoint.GetDescription()) + assert.Equal(t, influxdb.TaskStatusInactive, string(endpoints[0].NotificationEndpoint.GetStatus())) + hasLabelAssociations(t, endpoints[0].LabelAssociations, 1, "label_1") + + require.Len(t, sum.NotificationRules, 1) + rule := sum.NotificationRules[0] + assert.Equal(t, "rule_0", rule.Name) + assert.Equal(t, pkger.SafeID(endpoints[0].NotificationEndpoint.GetID()), rule.EndpointID) + assert.NotEmpty(t, rule.EndpointName) + + require.Len(t, sum.Tasks, 1) + task := sum.Tasks[0] + assert.Equal(t, "task_1", task.Name) + assert.Equal(t, "desc_1", task.Description) + + teles := sum.TelegrafConfigs + require.Len(t, teles, 1) + assert.Equal(t, "first tele config", teles[0].TelegrafConfig.Name) + assert.Equal(t, "desc", teles[0].TelegrafConfig.Description) + assert.Equal(t, telConf, teles[0].TelegrafConfig.Config) + + vars := sum.Variables + require.Len(t, vars, 1) + assert.Equal(t, "query var", vars[0].Name) + hasLabelAssociations(t, vars[0].LabelAssociations, 1, "label_1") + varArgs := vars[0].Arguments + require.NotNil(t, varArgs) + assert.Equal(t, "query", varArgs.Type) + assert.Equal(t, influxdb.VariableQueryValues{ + Query: "buckets() |> filter(fn: (r) => r.name !~ /^_/) |> rename(columns: {name: \"_value\"}) |> keep(columns: [\"_value\"])", + Language: "flux", + }, varArgs.Values) + + newSumMapping := func(id pkger.SafeID, pkgName, name string, rt influxdb.ResourceType) pkger.SummaryLabelMapping { + return pkger.SummaryLabelMapping{ + Status: pkger.StateStatusNew, + ResourceID: id, + ResourceType: rt, + ResourcePkgName: pkgName, + ResourceName: name, + LabelPkgName: labels[0].PkgName, + LabelName: labels[0].Name, + LabelID: labels[0].ID, + } + } + + mappings := sum.LabelMappings + require.Len(t, mappings, 11) + assert.Contains(t, mappings, newSumMapping(bkts[0].ID, bkts[0].PkgName, bkts[0].Name, influxdb.BucketsResourceType)) + //hasMapping(t, mappings, newSumMapping(pkger.SafeID(checks[0].Check.GetID()), checks[0].Check.GetName(), influxdb.ChecksResourceType)) + //hasMapping(t, mappings, newSumMapping(pkger.SafeID(checks[1].Check.GetID()), checks[1].Check.GetName(), influxdb.ChecksResourceType)) + //hasMapping(t, mappings, newSumMapping(dashs[0].ID, dashs[0].Name, influxdb.DashboardsResourceType)) + //hasMapping(t, mappings, newSumMapping(pkger.SafeID(endpoints[0].NotificationEndpoint.GetID()), endpoints[0].NotificationEndpoint.GetName(), influxdb.NotificationEndpointResourceType)) + //hasMapping(t, mappings, newSumMapping(rule.ID, rule.Name, influxdb.NotificationRuleResourceType)) + //hasMapping(t, mappings, newSumMapping(task.ID, task.Name, influxdb.TasksResourceType)) + //hasMapping(t, mappings, newSumMapping(pkger.SafeID(teles[0].TelegrafConfig.ID), teles[0].TelegrafConfig.Name, influxdb.TelegrafsResourceType)) + //hasMapping(t, mappings, newSumMapping(vars[0].ID, vars[0].Name, influxdb.VariablesResourceType)) }) t.Run("filtered by resource types", func(t *testing.T) { @@ -883,7 +934,7 @@ spec: t.Run("pkg with same bkt-var-label does nto create new resources for them", func(t *testing.T) { // validate the new package doesn't create new resources for bkts/labels/vars // since names collide. - sum2, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, newPkg(t)) + sum2, _, err := svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, newPkg(t)) require.NoError(t, err) require.Equal(t, sum1.Buckets, sum2.Buckets) @@ -901,7 +952,7 @@ spec: pkg, err := pkger.Parse(pkger.EncodingYAML, pkger.FromString(pkgStr)) require.NoError(t, err) - sum, err := svc.Apply(ctx, l.Org.ID, l.User.ID, pkg) + sum, _, err := svc.Apply(ctx, l.Org.ID, l.User.ID, pkg) require.NoError(t, err) return sum } @@ -1095,7 +1146,7 @@ spec: pkger.WithVariableSVC(l.VariableService(t)), ) - _, err = svc.Apply(ctx, l.Org.ID, 0, updatePkg) + _, _, err = svc.Apply(ctx, l.Org.ID, 0, updatePkg) require.Error(t, err) bkt, err := l.BucketService(t).FindBucketByID(ctx, influxdb.ID(sum1Bkts[0].ID)) @@ -1175,7 +1226,7 @@ spec: pkg, err := pkger.Parse(pkger.EncodingYAML, pkger.FromString(pkgStr)) require.NoError(t, err) - sum, err := svc.Apply(timedCtx(time.Second), l.Org.ID, l.User.ID, pkg) + sum, _, err := svc.Apply(timedCtx(time.Second), l.Org.ID, l.User.ID, pkg) require.NoError(t, err) require.Len(t, sum.Tasks, 1) @@ -1321,7 +1372,7 @@ spec: } assert.Equal(t, expectedMissingEnvs, sum.MissingEnvs) - sum, err = svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, pkg, pkger.ApplyWithEnvRefs(map[string]string{ + sum, _, err = svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, pkg, pkger.ApplyWithEnvRefs(map[string]string{ "bkt-1-name-ref": "rucket_threeve", "check-1-name-ref": "check_threeve", "dash-1-name-ref": "dash_threeve", diff --git a/http/swagger.yml b/http/swagger.yml index e9d41a13fd..321d523ceb 100644 --- a/http/swagger.yml +++ b/http/swagger.yml @@ -7360,6 +7360,8 @@ components: type: "string" orgID: type: "string" + pkgName: + type: string name: type: "string" description: @@ -7377,12 +7379,18 @@ components: items: type: object properties: + status: + type: string + resourcePkgName: + type: string resourceName: type: string resourceID: type: string resourceType: type: string + labelPkgName: + type: string labelName: type: string labelID: @@ -7413,6 +7421,8 @@ components: items: type: object properties: + pkgName: + type: string name: type: string description: @@ -7460,6 +7470,8 @@ components: items: type: object properties: + pkgName: + type: string id: type: string name: @@ -7483,6 +7495,8 @@ components: - $ref: "#/components/schemas/TelegrafRequest" - type: object properties: + pkgName: + type: string labelAssociations: type: array items: @@ -7622,16 +7636,20 @@ components: items: type: object properties: - isNew: - type: boolean + status: + type: string resourceType: type: string resourceID: type: string + resourcePkgName: + type: string resourceName: type: string labelID: type: string + labelPkgName: + type: string labelName: type: string notificationEndpoints: diff --git a/pkger/http_remote_service.go b/pkger/http_remote_service.go index 8c272584f7..f462ad5199 100644 --- a/pkger/http_remote_service.go +++ b/pkger/http_remote_service.go @@ -112,9 +112,8 @@ func (s *HTTPRemoteService) DryRun(ctx context.Context, orgID, userID influxdb.I // Apply will apply all the resources identified in the provided pkg. The entire pkg will be applied // in its entirety. If a failure happens midway then the entire pkg will be rolled back to the state // from before the pkg was applied. -func (s *HTTPRemoteService) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, error) { - sum, _, err := s.apply(ctx, orgID, pkg, false, opts...) - return sum, err +func (s *HTTPRemoteService) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, error) { + return s.apply(ctx, orgID, pkg, false, opts...) } func (s *HTTPRemoteService) apply(ctx context.Context, orgID influxdb.ID, pkg *Pkg, dryRun bool, opts ...ApplyOptFn) (Summary, Diff, error) { diff --git a/pkger/http_server.go b/pkger/http_server.go index beb598f350..ebe8cee9a7 100644 --- a/pkger/http_server.go +++ b/pkger/http_server.go @@ -355,22 +355,21 @@ func (s *HTTPServer) applyPkg(w http.ResponseWriter, r *http.Request) { ApplyWithStackID(stackID), } - sum, diff, err := s.svc.DryRun(r.Context(), *orgID, userID, parsedPkg, applyOpts...) - if IsParseErr(err) { - s.api.Respond(w, http.StatusUnprocessableEntity, RespApplyPkg{ - Diff: diff, - Summary: sum, - Errors: convertParseErr(err), - }) - return - } - if err != nil { - s.api.Err(w, err) - return - } - - // if only a dry run, then we exit before anything destructive if reqBody.DryRun { + sum, diff, err := s.svc.DryRun(r.Context(), *orgID, userID, parsedPkg, applyOpts...) + if IsParseErr(err) { + s.api.Respond(w, http.StatusUnprocessableEntity, RespApplyPkg{ + Diff: diff, + Summary: sum, + Errors: convertParseErr(err), + }) + return + } + if err != nil { + s.api.Err(w, err) + return + } + s.api.Respond(w, http.StatusOK, RespApplyPkg{ Diff: diff, Summary: sum, @@ -380,7 +379,7 @@ func (s *HTTPServer) applyPkg(w http.ResponseWriter, r *http.Request) { applyOpts = append(applyOpts, ApplyWithSecrets(reqBody.Secrets)) - sum, err = s.svc.Apply(r.Context(), *orgID, userID, parsedPkg, applyOpts...) + sum, diff, err := s.svc.Apply(r.Context(), *orgID, userID, parsedPkg, applyOpts...) if err != nil && !IsParseErr(err) { s.api.Err(w, err) return diff --git a/pkger/http_server_test.go b/pkger/http_server_test.go index 5b50bfd1be..321cea7727 100644 --- a/pkger/http_server_test.go +++ b/pkger/http_server_test.go @@ -407,11 +407,13 @@ func TestPkgerHTTPServer(t *testing.T) { t.Run("apply a pkg", func(t *testing.T) { svc := &fakeSVC{ - dryRunFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) { - if err := pkg.Validate(); err != nil { - return pkger.Summary{}, pkger.Diff{}, err + applyFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) { + var opt pkger.ApplyOpt + for _, o := range opts { + o(&opt) } sum := pkg.Summary() + var diff pkger.Diff for _, b := range sum.Buckets { diff.Buckets = append(diff.Buckets, pkger.DiffBucket{ @@ -420,18 +422,10 @@ func TestPkgerHTTPServer(t *testing.T) { }, }) } - return sum, diff, nil - }, - applyFn: func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, error) { - var opt pkger.ApplyOpt - for _, o := range opts { - o(&opt) - } - sum := pkg.Summary() for key := range opt.MissingSecrets { sum.MissingSecrets = append(sum.MissingSecrets, key) } - return sum, nil + return sum, diff, nil }, } @@ -647,7 +641,7 @@ func decodeBody(t *testing.T, r io.Reader, v interface{}) { type fakeSVC struct { initStack func(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) dryRunFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) - applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, error) + applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) } var _ pkger.SVC = (*fakeSVC)(nil) @@ -671,7 +665,7 @@ func (f *fakeSVC) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *pk return f.dryRunFn(ctx, orgID, userID, pkg, opts...) } -func (f *fakeSVC) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, error) { +func (f *fakeSVC) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error) { if f.applyFn == nil { panic("not implemented") } diff --git a/pkger/models.go b/pkger/models.go index aec28ae627..dea723ad53 100644 --- a/pkger/models.go +++ b/pkger/models.go @@ -8,7 +8,6 @@ import ( "reflect" "regexp" "sort" - "strconv" "strings" "time" @@ -193,7 +192,7 @@ type ( DiffIdentifier New DiffBucketValues `json:"new"` - Old *DiffBucketValues `json:"old,omitempty"` // using omitempty here to signal there was no prev state with a nil + Old *DiffBucketValues `json:"old"` } // DiffBucketValues are the varying values for a bucket. @@ -204,32 +203,6 @@ type ( } ) -func newDiffBucket(b *bucket, i *influxdb.Bucket) DiffBucket { - diff := DiffBucket{ - DiffIdentifier: DiffIdentifier{ - ID: SafeID(b.ID()), - Remove: b.shouldRemove, - PkgName: b.PkgName(), - }, - New: DiffBucketValues{ - Name: b.Name(), - Description: b.Description, - RetentionRules: b.RetentionRules, - }, - } - if i != nil { - diff.ID = SafeID(i.ID) - diff.Old = &DiffBucketValues{ - Name: i.Name, - Description: i.Description, - } - if i.RetentionPeriod > 0 { - diff.Old.RetentionRules = retentionRules{newRetentionRule(i.RetentionPeriod)} - } - } - return diff -} - func (d DiffBucket) hasConflict() bool { return !d.IsNew() && d.Old != nil && !reflect.DeepEqual(*d.Old, d.New) } @@ -363,7 +336,7 @@ type ( DiffIdentifier New DiffLabelValues `json:"new"` - Old *DiffLabelValues `json:"old,omitempty"` // using omitempty here to signal there was no prev state with a nil + Old *DiffLabelValues `json:"old"` } // DiffLabelValues are the varying values for a label. @@ -374,48 +347,39 @@ type ( } ) -func newDiffLabel(l *label, i *influxdb.Label) DiffLabel { - diff := DiffLabel{ - DiffIdentifier: DiffIdentifier{ - ID: SafeID(l.ID()), - Remove: l.shouldRemove, - PkgName: l.PkgName(), - }, - New: DiffLabelValues{ - Name: l.Name(), - Color: l.Color, - Description: l.Description, - }, - } - if i != nil { - diff.ID = SafeID(i.ID) - diff.Old = &DiffLabelValues{ - Name: i.Name, - Color: i.Properties["color"], - Description: i.Properties["description"], - } - } - return diff -} - func (d DiffLabel) hasConflict() bool { return !d.IsNew() && d.Old != nil && *d.Old != d.New } +// StateStatus indicates the status of a diff or summary resource +type StateStatus string + +const ( + StateStatusExists StateStatus = "exists" + StateStatusNew StateStatus = "new" + StateStatusRemove StateStatus = "remove" +) + // DiffLabelMapping is a diff of an individual label mapping. A // single resource may have multiple mappings to multiple labels. // A label can have many mappings to other resources. type DiffLabelMapping struct { - IsNew bool `json:"isNew"` + StateStatus StateStatus `json:"stateStatus"` - ResType influxdb.ResourceType `json:"resourceType"` - ResID SafeID `json:"resourceID"` - ResName string `json:"resourceName"` + ResType influxdb.ResourceType `json:"resourceType"` + ResID SafeID `json:"resourceID"` + ResName string `json:"resourceName"` + ResPkgName string `json:"resourcePkgName"` - LabelID SafeID `json:"labelID"` - LabelName string `json:"labelName"` + LabelID SafeID `json:"labelID"` + LabelName string `json:"labelName"` + LabelPkgName string `json:"labelPkgName"` } +//func (d DiffLabelMapping) IsNew() bool { +// return d.StateStatus == StateStatusNew +//} + // DiffNotificationEndpointValues are the varying values for a notification endpoint. type DiffNotificationEndpointValues struct { influxdb.NotificationEndpoint @@ -747,6 +711,7 @@ func (s *SummaryCheck) UnmarshalJSON(b []byte) error { type SummaryDashboard struct { ID SafeID `json:"id"` OrgID SafeID `json:"orgID"` + PkgName string `json:"pkgName"` Name string `json:"name"` Description string `json:"description"` Charts []SummaryChart `json:"charts"` @@ -868,6 +833,7 @@ func (s *SummaryNotificationEndpoint) UnmarshalJSON(b []byte) error { type ( SummaryNotificationRule struct { ID SafeID `json:"id"` + PkgName string `json:"pkgName"` Name string `json:"name"` Description string `json:"description"` @@ -911,17 +877,21 @@ type SummaryLabel struct { // SummaryLabelMapping provides a summary of a label mapped with a single resource. type SummaryLabelMapping struct { - exists bool - ResourceID SafeID `json:"resourceID"` - ResourceName string `json:"resourceName"` - ResourceType influxdb.ResourceType `json:"resourceType"` - LabelName string `json:"labelName"` - LabelID SafeID `json:"labelID"` + exists bool + Status StateStatus `json:"status,omitempty"` + ResourceID SafeID `json:"resourceID"` + ResourcePkgName string `json:"resourcePkgName"` + ResourceName string `json:"resourceName"` + ResourceType influxdb.ResourceType `json:"resourceType"` + LabelPkgName string `json:"labelPkgName"` + LabelName string `json:"labelName"` + LabelID SafeID `json:"labelID"` } // SummaryTask provides a summary of a task. type SummaryTask struct { ID SafeID `json:"id"` + PkgName string `json:"pkgName"` Name string `json:"name"` Cron string `json:"cron"` Description string `json:"description"` @@ -935,6 +905,7 @@ type SummaryTask struct { // SummaryTelegraf provides a summary of a pkg telegraf config. type SummaryTelegraf struct { + PkgName string `json:"pkgName"` TelegrafConfig influxdb.TelegrafConfig `json:"telegrafConfig"` LabelAssociations []SummaryLabel `json:"labelAssociations"` } @@ -950,195 +921,6 @@ type SummaryVariable struct { LabelAssociations []SummaryLabel `json:"labelAssociations"` } -type identity struct { - name *references - displayName *references - shouldRemove bool -} - -func (i *identity) Name() string { - if displayName := i.displayName.String(); displayName != "" { - return displayName - } - return i.name.String() -} - -func (i *identity) PkgName() string { - return i.name.String() -} - -const ( - fieldAPIVersion = "apiVersion" - fieldAssociations = "associations" - fieldDescription = "description" - fieldEvery = "every" - fieldKey = "key" - fieldKind = "kind" - fieldLanguage = "language" - fieldLevel = "level" - fieldMin = "min" - fieldMax = "max" - fieldMetadata = "metadata" - fieldName = "name" - fieldOffset = "offset" - fieldOperator = "operator" - fieldPrefix = "prefix" - fieldQuery = "query" - fieldSuffix = "suffix" - fieldSpec = "spec" - fieldStatus = "status" - fieldType = "type" - fieldValue = "value" - fieldValues = "values" -) - -const ( - fieldBucketRetentionRules = "retentionRules" -) - -const bucketNameMinLength = 2 - -type bucket struct { - identity - - id influxdb.ID - OrgID influxdb.ID - Description string - RetentionRules retentionRules - labels sortedLabels - - // existing provides context for a resource that already - // exists in the platform. If a resource already exists - // then it will be referenced here. - existing *influxdb.Bucket -} - -func (b *bucket) ID() influxdb.ID { - if b.existing != nil { - return b.existing.ID - } - return b.id -} - -func (b *bucket) Labels() []*label { - return b.labels -} - -func (b *bucket) ResourceType() influxdb.ResourceType { - return KindBucket.ResourceType() -} - -func (b *bucket) Exists() bool { - return b.existing != nil -} - -func (b *bucket) summarize() SummaryBucket { - return SummaryBucket{ - ID: SafeID(b.ID()), - OrgID: SafeID(b.OrgID), - Name: b.Name(), - PkgName: b.PkgName(), - Description: b.Description, - RetentionPeriod: b.RetentionRules.RP(), - LabelAssociations: toSummaryLabels(b.labels...), - } -} - -func (b *bucket) valid() []validationErr { - var vErrs []validationErr - if err, ok := isValidName(b.Name(), bucketNameMinLength); !ok { - vErrs = append(vErrs, err) - } - vErrs = append(vErrs, b.RetentionRules.valid()...) - if len(vErrs) == 0 { - return nil - } - return []validationErr{ - objectValidationErr(fieldSpec, vErrs...), - } -} - -func (b *bucket) shouldApply() bool { - return b.shouldRemove || - b.existing == nil || - b.Description != b.existing.Description || - b.Name() != b.existing.Name || - b.RetentionRules.RP() != b.existing.RetentionPeriod -} - -type mapperBuckets []*bucket - -func (b mapperBuckets) Association(i int) labelAssociater { - return b[i] -} - -func (b mapperBuckets) Len() int { - return len(b) -} - -const ( - retentionRuleTypeExpire = "expire" -) - -type retentionRule struct { - Type string `json:"type" yaml:"type"` - Seconds int `json:"everySeconds" yaml:"everySeconds"` -} - -func newRetentionRule(d time.Duration) retentionRule { - return retentionRule{ - Type: retentionRuleTypeExpire, - Seconds: int(d.Round(time.Second) / time.Second), - } -} - -func (r retentionRule) valid() []validationErr { - const hour = 3600 - var ff []validationErr - if r.Seconds < hour { - ff = append(ff, validationErr{ - Field: fieldRetentionRulesEverySeconds, - Msg: "seconds must be a minimum of " + strconv.Itoa(hour), - }) - } - if r.Type != retentionRuleTypeExpire { - ff = append(ff, validationErr{ - Field: fieldType, - Msg: `type must be "expire"`, - }) - } - return ff -} - -const ( - fieldRetentionRulesEverySeconds = "everySeconds" -) - -type retentionRules []retentionRule - -func (r retentionRules) RP() time.Duration { - // TODO: this feels very odd to me, will need to follow up with - // team to better understand this - for _, rule := range r { - return time.Duration(rule.Seconds) * time.Second - } - return 0 -} - -func (r retentionRules) valid() []validationErr { - var failures []validationErr - for i, rule := range r { - if ff := rule.valid(); len(ff) > 0 { - failures = append(failures, validationErr{ - Field: fieldBucketRetentionRules, - Index: intPtr(i), - Nested: ff, - }) - } - } - return failures -} - type checkKind int const ( @@ -1389,180 +1171,6 @@ func toInfluxThresholds(thresholds ...threshold) []icheck.ThresholdConfig { return iThresh } -type assocMapKey struct { - resType influxdb.ResourceType - name string -} - -type assocMapVal struct { - exists bool - v interface{} -} - -func (l assocMapVal) ID() influxdb.ID { - if t, ok := l.v.(labelAssociater); ok { - return t.ID() - } - return 0 -} - -type associationMapping struct { - mappings map[assocMapKey][]assocMapVal -} - -func (l *associationMapping) setMapping(v interface { - ResourceType() influxdb.ResourceType - Name() string -}, exists bool) { - if l == nil { - return - } - if l.mappings == nil { - l.mappings = make(map[assocMapKey][]assocMapVal) - } - - k := assocMapKey{ - resType: v.ResourceType(), - name: v.Name(), - } - val := assocMapVal{ - exists: exists, - v: v, - } - existing, ok := l.mappings[k] - if !ok { - l.mappings[k] = []assocMapVal{val} - return - } - for i, ex := range existing { - if ex.v == v { - existing[i].exists = exists - return - } - } - l.mappings[k] = append(l.mappings[k], val) -} - -const ( - fieldLabelColor = "color" -) - -const labelNameMinLength = 2 - -type label struct { - identity - - id influxdb.ID - OrgID influxdb.ID - Color string - Description string - associationMapping - - // exists provides context for a resource that already - // exists in the platform. If a resource already exists(exists=true) - // then the ID should be populated. - existing *influxdb.Label -} - -func (l *label) ID() influxdb.ID { - if l.existing != nil { - return l.existing.ID - } - return l.id -} - -func (l *label) shouldApply() bool { - return l.existing == nil || - l.Description != l.existing.Properties["description"] || - l.Name() != l.existing.Name || - l.Color != l.existing.Properties["color"] -} - -func (l *label) summarize() SummaryLabel { - return SummaryLabel{ - ID: SafeID(l.ID()), - OrgID: SafeID(l.OrgID), - PkgName: l.PkgName(), - Name: l.Name(), - Properties: struct { - Color string `json:"color"` - Description string `json:"description"` - }{ - Color: l.Color, - Description: l.Description, - }, - } -} - -func (l *label) mappingSummary() []SummaryLabelMapping { - var mappings []SummaryLabelMapping - for resource, vals := range l.mappings { - for _, v := range vals { - mappings = append(mappings, SummaryLabelMapping{ - exists: v.exists, - ResourceID: SafeID(v.ID()), - ResourceName: resource.name, - ResourceType: resource.resType, - LabelID: SafeID(l.ID()), - LabelName: l.Name(), - }) - } - } - - return mappings -} - -func (l *label) properties() map[string]string { - return map[string]string{ - "color": l.Color, - "description": l.Description, - } -} - -func (l *label) toInfluxLabel() influxdb.Label { - return influxdb.Label{ - ID: l.ID(), - OrgID: l.OrgID, - Name: l.Name(), - Properties: l.properties(), - } -} - -func (l *label) valid() []validationErr { - var vErrs []validationErr - if err, ok := isValidName(l.Name(), labelNameMinLength); !ok { - vErrs = append(vErrs, err) - } - if len(vErrs) == 0 { - return nil - } - return []validationErr{ - objectValidationErr(fieldSpec, vErrs...), - } -} - -func toSummaryLabels(labels ...*label) []SummaryLabel { - iLabels := make([]SummaryLabel, 0, len(labels)) - for _, l := range labels { - iLabels = append(iLabels, l.summarize()) - } - return iLabels -} - -type sortedLabels []*label - -func (s sortedLabels) Len() int { - return len(s) -} - -func (s sortedLabels) Less(i, j int) bool { - return s[i].Name() < s[j].Name() -} - -func (s sortedLabels) Swap(i, j int) { - s[i], s[j] = s[j], s[i] -} - type notificationKind int const ( @@ -1858,6 +1466,7 @@ func (r *notificationRule) Status() influxdb.Status { func (r *notificationRule) summarize() SummaryNotificationRule { return SummaryNotificationRule{ ID: SafeID(r.ID()), + PkgName: r.PkgName(), Name: r.Name(), EndpointID: SafeID(r.endpointID), EndpointName: r.endpointName.String(), @@ -2133,6 +1742,7 @@ func (t *task) flux() string { func (t *task) summarize() SummaryTask { return SummaryTask{ ID: SafeID(t.ID()), + PkgName: t.PkgName(), Name: t.Name(), Cron: t.cron, Description: t.description, @@ -2230,6 +1840,7 @@ func (t *telegraf) summarize() SummaryTelegraf { cfg := t.config cfg.Name = t.Name() return SummaryTelegraf{ + PkgName: t.PkgName(), TelegrafConfig: cfg, LabelAssociations: toSummaryLabels(t.labels...), } @@ -2441,6 +2052,7 @@ func (d *dashboard) summarize() SummaryDashboard { iDash := SummaryDashboard{ ID: SafeID(d.ID()), OrgID: SafeID(d.OrgID), + PkgName: d.PkgName(), Name: d.Name(), Description: d.Description, LabelAssociations: toSummaryLabels(d.labels...), diff --git a/pkger/models_test.go b/pkger/models_test.go index 669e1a564b..71d07d0611 100644 --- a/pkger/models_test.go +++ b/pkger/models_test.go @@ -16,16 +16,12 @@ func TestPkg(t *testing.T) { pkg := Pkg{ mBuckets: map[string]*bucket{ "buck_2": { - id: influxdb.ID(2), - OrgID: influxdb.ID(100), Description: "desc2", - identity: identity{name: &references{val: "name2"}}, + identity: identity{name: &references{val: "pkgName2"}, displayName: &references{val: "name2"}}, RetentionRules: retentionRules{newRetentionRule(2 * time.Hour)}, }, "buck_1": { - id: influxdb.ID(1), - OrgID: influxdb.ID(100), - identity: identity{name: &references{val: "name1"}}, + identity: identity{name: &references{val: "pkgName1"}, displayName: &references{val: "name1"}}, Description: "desc1", RetentionRules: retentionRules{newRetentionRule(time.Hour)}, }, @@ -33,13 +29,13 @@ func TestPkg(t *testing.T) { } summary := pkg.Summary() - require.Len(t, summary.Buckets, len(pkg.mBuckets)) for i := 1; i <= len(summary.Buckets); i++ { buck := summary.Buckets[i-1] - assert.Equal(t, SafeID(i), buck.ID) - assert.Equal(t, SafeID(100), buck.OrgID) + assert.Zero(t, buck.ID) + assert.Zero(t, buck.OrgID) assert.Equal(t, "desc"+strconv.Itoa(i), buck.Description) + assert.Equal(t, "pkgName"+strconv.Itoa(i), buck.PkgName) assert.Equal(t, "name"+strconv.Itoa(i), buck.Name) assert.Equal(t, time.Duration(i)*time.Hour, buck.RetentionPeriod) } @@ -49,16 +45,12 @@ func TestPkg(t *testing.T) { pkg := Pkg{ mLabels: map[string]*label{ "2": { - id: influxdb.ID(2), - OrgID: influxdb.ID(100), - identity: identity{name: &references{val: "name2"}}, + identity: identity{name: &references{val: "pkgName2"}, displayName: &references{val: "name2"}}, Description: "desc2", Color: "blurple", }, "1": { - id: influxdb.ID(1), - OrgID: influxdb.ID(100), - identity: identity{name: &references{val: "name1"}}, + identity: identity{name: &references{val: "pkgName1"}, displayName: &references{val: "name1"}}, Description: "desc1", Color: "peru", }, @@ -69,29 +61,24 @@ func TestPkg(t *testing.T) { require.Len(t, summary.Labels, len(pkg.mLabels)) label1 := summary.Labels[0] - assert.Equal(t, SafeID(1), label1.ID) - assert.Equal(t, SafeID(100), label1.OrgID) + assert.Equal(t, "pkgName1", label1.PkgName) assert.Equal(t, "name1", label1.Name) assert.Equal(t, "desc1", label1.Properties.Description) assert.Equal(t, "peru", label1.Properties.Color) label2 := summary.Labels[1] - assert.Equal(t, SafeID(2), label2.ID) - assert.Equal(t, SafeID(100), label2.OrgID) - assert.Equal(t, "desc2", label2.Properties.Description) + assert.Equal(t, "pkgName2", label2.PkgName) assert.Equal(t, "name2", label2.Name) + assert.Equal(t, "desc2", label2.Properties.Description) assert.Equal(t, "blurple", label2.Properties.Color) }) t.Run("label mappings returned in asc order by name", func(t *testing.T) { bucket1 := &bucket{ - id: influxdb.ID(20), - identity: identity{name: &references{val: "b1"}}, + identity: identity{name: &references{val: "pkgBucket1"}, displayName: &references{val: "bd1"}}, } label1 := &label{ - id: influxdb.ID(2), - OrgID: influxdb.ID(100), - identity: identity{name: &references{val: "name2"}}, + identity: identity{name: &references{val: "pkgLabel2"}, displayName: &references{val: "name2"}}, Description: "desc2", Color: "blurple", associationMapping: associationMapping{ @@ -108,18 +95,18 @@ func TestPkg(t *testing.T) { bucket1.labels = append(bucket1.labels, label1) pkg := Pkg{ - mBuckets: map[string]*bucket{bucket1.Name(): bucket1}, - mLabels: map[string]*label{label1.Name(): label1}, + mBuckets: map[string]*bucket{bucket1.PkgName(): bucket1}, + mLabels: map[string]*label{label1.PkgName(): label1}, } summary := pkg.Summary() require.Len(t, summary.LabelMappings, 1) mapping1 := summary.LabelMappings[0] - assert.Equal(t, SafeID(bucket1.id), mapping1.ResourceID) + assert.Equal(t, bucket1.PkgName(), mapping1.ResourcePkgName) assert.Equal(t, bucket1.Name(), mapping1.ResourceName) assert.Equal(t, influxdb.BucketsResourceType, mapping1.ResourceType) - assert.Equal(t, SafeID(label1.id), mapping1.LabelID) + assert.Equal(t, label1.PkgName(), mapping1.LabelPkgName) assert.Equal(t, label1.Name(), mapping1.LabelName) }) }) @@ -489,11 +476,6 @@ func TestPkg(t *testing.T) { kind Kind validName string }{ - { - pkgFile: "testdata/bucket.yml", - kind: KindBucket, - validName: "rucket_11", - }, { pkgFile: "testdata/checks.yml", kind: KindCheck, diff --git a/pkger/parser.go b/pkger/parser.go index 3e4d7556c3..0528f7b1bf 100644 --- a/pkger/parser.go +++ b/pkger/parser.go @@ -257,8 +257,7 @@ type Pkg struct { mEnvVals map[string]string mSecrets map[string]bool - isVerified bool // dry run has verified pkg resources with existing resources - isParsed bool // indicates the pkg has been parsed and all resources graphed accordingly + isParsed bool // indicates the pkg has been parsed and all resources graphed accordingly } // Encode is a helper for encoding the pkg correctly. @@ -302,21 +301,13 @@ func (p *Pkg) Summary() Summary { NotificationRules: []SummaryNotificationRule{}, Labels: []SummaryLabel{}, MissingEnvs: p.missingEnvRefs(), - MissingSecrets: []string{}, + MissingSecrets: p.missingSecrets(), Tasks: []SummaryTask{}, TelegrafConfigs: []SummaryTelegraf{}, Variables: []SummaryVariable{}, } - // only add this after dry run has been completed - if p.isVerified { - sum.MissingSecrets = p.missingSecrets() - } - for _, b := range p.buckets() { - if b.shouldRemove { - continue - } sum.Buckets = append(sum.Buckets, b.summarize()) } @@ -418,11 +409,6 @@ func (p *Pkg) addObjectForRemoval(k Kind, pkgName string, id influxdb.ID) { } switch k { - case KindBucket: - p.mBuckets[pkgName] = &bucket{ - identity: newIdentity, - id: id, - } case KindCheck, KindCheckDeadman, KindCheckThreshold: p.mChecks[pkgName] = &check{ identity: newIdentity, @@ -471,11 +457,6 @@ func (p *Pkg) addObjectForRemoval(k Kind, pkgName string, id influxdb.ID) { func (p *Pkg) getObjectIDSetter(k Kind, pkgName string) (func(influxdb.ID), bool) { switch k { - case KindBucket: - b, ok := p.mBuckets[pkgName] - return func(id influxdb.ID) { - b.id = id - }, ok case KindCheck, KindCheckDeadman, KindCheckThreshold: ch, ok := p.mChecks[pkgName] return func(id influxdb.ID) { diff --git a/pkger/parser_models.go b/pkger/parser_models.go new file mode 100644 index 0000000000..d6e348ba44 --- /dev/null +++ b/pkger/parser_models.go @@ -0,0 +1,321 @@ +package pkger + +import ( + "strconv" + "time" + + "github.com/influxdata/influxdb/v2" +) + +type identity struct { + name *references + displayName *references + shouldRemove bool +} + +func (i *identity) Name() string { + if displayName := i.displayName.String(); displayName != "" { + return displayName + } + return i.name.String() +} + +func (i *identity) PkgName() string { + return i.name.String() +} + +const ( + fieldAPIVersion = "apiVersion" + fieldAssociations = "associations" + fieldDescription = "description" + fieldEvery = "every" + fieldKey = "key" + fieldKind = "kind" + fieldLanguage = "language" + fieldLevel = "level" + fieldMin = "min" + fieldMax = "max" + fieldMetadata = "metadata" + fieldName = "name" + fieldOffset = "offset" + fieldOperator = "operator" + fieldPrefix = "prefix" + fieldQuery = "query" + fieldSuffix = "suffix" + fieldSpec = "spec" + fieldStatus = "status" + fieldType = "type" + fieldValue = "value" + fieldValues = "values" +) + +const ( + fieldBucketRetentionRules = "retentionRules" +) + +const bucketNameMinLength = 2 + +type bucket struct { + identity + + Description string + RetentionRules retentionRules + labels sortedLabels +} + +func (b *bucket) summarize() SummaryBucket { + return SummaryBucket{ + Name: b.Name(), + PkgName: b.PkgName(), + Description: b.Description, + RetentionPeriod: b.RetentionRules.RP(), + LabelAssociations: toSummaryLabels(b.labels...), + } +} + +func (b *bucket) ResourceType() influxdb.ResourceType { + return KindBucket.ResourceType() +} + +func (b *bucket) valid() []validationErr { + var vErrs []validationErr + if err, ok := isValidName(b.Name(), bucketNameMinLength); !ok { + vErrs = append(vErrs, err) + } + vErrs = append(vErrs, b.RetentionRules.valid()...) + if len(vErrs) == 0 { + return nil + } + return []validationErr{ + objectValidationErr(fieldSpec, vErrs...), + } +} + +const ( + retentionRuleTypeExpire = "expire" +) + +type retentionRule struct { + Type string `json:"type" yaml:"type"` + Seconds int `json:"everySeconds" yaml:"everySeconds"` +} + +func newRetentionRule(d time.Duration) retentionRule { + return retentionRule{ + Type: retentionRuleTypeExpire, + Seconds: int(d.Round(time.Second) / time.Second), + } +} + +func (r retentionRule) valid() []validationErr { + const hour = 3600 + var ff []validationErr + if r.Seconds < hour { + ff = append(ff, validationErr{ + Field: fieldRetentionRulesEverySeconds, + Msg: "seconds must be a minimum of " + strconv.Itoa(hour), + }) + } + if r.Type != retentionRuleTypeExpire { + ff = append(ff, validationErr{ + Field: fieldType, + Msg: `type must be "expire"`, + }) + } + return ff +} + +const ( + fieldRetentionRulesEverySeconds = "everySeconds" +) + +type retentionRules []retentionRule + +func (r retentionRules) RP() time.Duration { + // TODO: this feels very odd to me, will need to follow up with + // team to better understand this + for _, rule := range r { + return time.Duration(rule.Seconds) * time.Second + } + return 0 +} + +func (r retentionRules) valid() []validationErr { + var failures []validationErr + for i, rule := range r { + if ff := rule.valid(); len(ff) > 0 { + failures = append(failures, validationErr{ + Field: fieldBucketRetentionRules, + Index: intPtr(i), + Nested: ff, + }) + } + } + return failures +} + +type assocMapKey struct { + resType influxdb.ResourceType + name string +} + +type assocMapVal struct { + exists bool + v interface{} +} + +func (l assocMapVal) ID() influxdb.ID { + if t, ok := l.v.(labelAssociater); ok { + return t.ID() + } + return 0 +} + +func (l assocMapVal) PkgName() string { + t, ok := l.v.(interface{ PkgName() string }) + if ok { + return t.PkgName() + } + return "" +} + +type associationMapping struct { + mappings map[assocMapKey][]assocMapVal +} + +func (l *associationMapping) setMapping(v interface { + ResourceType() influxdb.ResourceType + Name() string +}, exists bool) { + if l == nil { + return + } + if l.mappings == nil { + l.mappings = make(map[assocMapKey][]assocMapVal) + } + + k := assocMapKey{ + resType: v.ResourceType(), + name: v.Name(), + } + val := assocMapVal{ + exists: exists, + v: v, + } + existing, ok := l.mappings[k] + if !ok { + l.mappings[k] = []assocMapVal{val} + return + } + for i, ex := range existing { + if ex.v == v { + existing[i].exists = exists + return + } + } + l.mappings[k] = append(l.mappings[k], val) +} + +const ( + fieldLabelColor = "color" +) + +const labelNameMinLength = 2 + +type label struct { + id influxdb.ID + identity + + Color string + Description string + associationMapping + + // exists provides context for a resource that already + // exists in the platform. If a resource already exists(exists=true) + // then the ID should be populated. + existing *influxdb.Label +} + +func (l *label) summarize() SummaryLabel { + return SummaryLabel{ + PkgName: l.PkgName(), + Name: l.Name(), + Properties: struct { + Color string `json:"color"` + Description string `json:"description"` + }{ + Color: l.Color, + Description: l.Description, + }, + } +} + +func (l *label) mappingSummary() []SummaryLabelMapping { + var mappings []SummaryLabelMapping + for resource, vals := range l.mappings { + for _, v := range vals { + status := StateStatusNew + if v.exists { + status = StateStatusExists + } + mappings = append(mappings, SummaryLabelMapping{ + exists: v.exists, + Status: status, + ResourceID: SafeID(v.ID()), + ResourcePkgName: v.PkgName(), + ResourceName: resource.name, + ResourceType: resource.resType, + LabelID: SafeID(l.ID()), + LabelPkgName: l.PkgName(), + LabelName: l.Name(), + }) + } + } + + return mappings +} + +func (l *label) ID() influxdb.ID { + if l.id != 0 { + return l.id + } + if l.existing != nil { + return l.existing.ID + } + return 0 +} + +func (l *label) valid() []validationErr { + var vErrs []validationErr + if err, ok := isValidName(l.Name(), labelNameMinLength); !ok { + vErrs = append(vErrs, err) + } + if len(vErrs) == 0 { + return nil + } + return []validationErr{ + objectValidationErr(fieldSpec, vErrs...), + } +} + +func toSummaryLabels(labels ...*label) []SummaryLabel { + iLabels := make([]SummaryLabel, 0, len(labels)) + for _, l := range labels { + iLabels = append(iLabels, l.summarize()) + } + return iLabels +} + +type sortedLabels []*label + +func (s sortedLabels) Len() int { + return len(s) +} + +func (s sortedLabels) Less(i, j int) bool { + return s[i].Name() < s[j].Name() +} + +func (s sortedLabels) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} diff --git a/pkger/parser_test.go b/pkger/parser_test.go index 8e6e9b0cd3..ff6cfae72d 100644 --- a/pkger/parser_test.go +++ b/pkger/parser_test.go @@ -339,27 +339,35 @@ spec: expectedMappings := []SummaryLabelMapping{ { - ResourceName: "rucket_1", - LabelName: "label_1", + ResourcePkgName: "rucket_1", + ResourceName: "rucket_1", + LabelPkgName: "label_1", + LabelName: "label_1", }, { - ResourceName: "rucket_2", - LabelName: "label_2", + ResourcePkgName: "rucket_2", + ResourceName: "rucket_2", + LabelPkgName: "label_2", + LabelName: "label_2", }, { - ResourceName: "rucket_3", - LabelName: "label_1", + ResourcePkgName: "rucket_3", + ResourceName: "rucket_3", + LabelPkgName: "label_1", + LabelName: "label_1", }, { - ResourceName: "rucket_3", - LabelName: "label_2", + ResourcePkgName: "rucket_3", + ResourceName: "rucket_3", + LabelPkgName: "label_2", + LabelName: "label_2", }, } - require.Len(t, sum.LabelMappings, len(expectedMappings)) - for i, expected := range expectedMappings { - expected.ResourceType = influxdb.BucketsResourceType - assert.Equal(t, expected, sum.LabelMappings[i]) + for _, expectedMapping := range expectedMappings { + expectedMapping.Status = StateStatusNew + expectedMapping.ResourceType = influxdb.BucketsResourceType + assert.Contains(t, sum.LabelMappings, expectedMapping) } }) }) @@ -521,18 +529,25 @@ spec: assert.True(t, deadmanCheck.ReportZero) assert.Len(t, check2.LabelAssociations, 1) - containsLabelMappings(t, sum.LabelMappings, - labelMapping{ - labelName: "label_1", - resName: "check_0", - resType: influxdb.ChecksResourceType, + expectedMappings := []SummaryLabelMapping{ + { + LabelPkgName: "label_1", + LabelName: "label_1", + ResourcePkgName: "check_0", + ResourceName: "check_0", }, - labelMapping{ - labelName: "label_1", - resName: "display name", - resType: influxdb.ChecksResourceType, + { + LabelPkgName: "label_1", + LabelName: "label_1", + ResourcePkgName: "check_1", + ResourceName: "display name", }, - ) + } + for _, expected := range expectedMappings { + expected.Status = StateStatusNew + expected.ResourceType = influxdb.ChecksResourceType + assert.Contains(t, sum.LabelMappings, expected) + } }) }) @@ -2589,18 +2604,14 @@ spec: actualLabel := actual.LabelAssociations[0] assert.Equal(t, "label_1", actualLabel.Name) - expectedMappings := []SummaryLabelMapping{ - { - ResourceName: "dash_1", - LabelName: "label_1", - }, - } - require.Len(t, sum.LabelMappings, len(expectedMappings)) - - for i, expected := range expectedMappings { - expected.ResourceType = influxdb.DashboardsResourceType - assert.Equal(t, expected, sum.LabelMappings[i]) - } + assert.Contains(t, sum.LabelMappings, SummaryLabelMapping{ + Status: StateStatusNew, + ResourceType: influxdb.DashboardsResourceType, + ResourcePkgName: "dash_1", + ResourceName: "dash_1", + LabelPkgName: "label_1", + LabelName: "label_1", + }) }) }) @@ -2696,6 +2707,7 @@ spec: testfileRunner(t, "testdata/notification_endpoint", func(t *testing.T, pkg *Pkg) { expectedEndpoints := []SummaryNotificationEndpoint{ { + PkgName: "http_basic_auth_notification_endpoint", NotificationEndpoint: &endpoint.HTTP{ Base: endpoint.Base{ Name: "basic endpoint name", @@ -2710,6 +2722,7 @@ spec: }, }, { + PkgName: "http_bearer_auth_notification_endpoint", NotificationEndpoint: &endpoint.HTTP{ Base: endpoint.Base{ Name: "http_bearer_auth_notification_endpoint", @@ -2723,6 +2736,7 @@ spec: }, }, { + PkgName: "http_none_auth_notification_endpoint", NotificationEndpoint: &endpoint.HTTP{ Base: endpoint.Base{ Name: "http_none_auth_notification_endpoint", @@ -2735,6 +2749,7 @@ spec: }, }, { + PkgName: "pager_duty_notification_endpoint", NotificationEndpoint: &endpoint.PagerDuty{ Base: endpoint.Base{ Name: "pager duty name", @@ -2746,6 +2761,7 @@ spec: }, }, { + PkgName: "slack_notification_endpoint", NotificationEndpoint: &endpoint.Slack{ Base: endpoint.Base{ Name: "slack name", @@ -2769,10 +2785,13 @@ spec: require.Len(t, actual.LabelAssociations, 1) assert.Equal(t, "label_1", actual.LabelAssociations[0].Name) - containsLabelMappings(t, sum.LabelMappings, labelMapping{ - labelName: "label_1", - resName: expected.NotificationEndpoint.GetName(), - resType: influxdb.NotificationEndpointResourceType, + assert.Contains(t, sum.LabelMappings, SummaryLabelMapping{ + Status: StateStatusNew, + ResourceType: influxdb.NotificationEndpointResourceType, + ResourcePkgName: expected.PkgName, + ResourceName: expected.NotificationEndpoint.GetName(), + LabelPkgName: "label_1", + LabelName: "label_1", }) } }) @@ -3529,9 +3548,12 @@ spec: require.Len(t, sum.LabelMappings, 1) expectedMapping := SummaryLabelMapping{ - ResourceName: "display name", - LabelName: "label_1", - ResourceType: influxdb.TelegrafsResourceType, + Status: StateStatusNew, + ResourcePkgName: "first_tele_config", + ResourceName: "display name", + LabelPkgName: "label_1", + LabelName: "label_1", + ResourceType: influxdb.TelegrafsResourceType, } assert.Equal(t, expectedMapping, sum.LabelMappings[0]) }) @@ -3803,8 +3825,11 @@ spec: expectedMappings := []SummaryLabelMapping{ { - ResourceName: "var_1", - LabelName: "label_1", + Status: StateStatusNew, + ResourcePkgName: "var_1", + ResourceName: "var_1", + LabelPkgName: "label_1", + LabelName: "label_1", }, } @@ -4406,25 +4431,6 @@ func testfileRunner(t *testing.T, path string, testFn func(t *testing.T, pkg *Pk } } -type labelMapping struct { - labelName string - resName string - resType influxdb.ResourceType -} - -func containsLabelMappings(t *testing.T, labelMappings []SummaryLabelMapping, matches ...labelMapping) { - t.Helper() - - for _, expected := range matches { - expectedMapping := SummaryLabelMapping{ - ResourceName: expected.resName, - LabelName: expected.labelName, - ResourceType: expected.resType, - } - assert.Contains(t, labelMappings, expectedMapping) - } -} - func strPtr(s string) *string { return &s } diff --git a/pkger/service.go b/pkger/service.go index 98da2ea853..b83a3bc7c5 100644 --- a/pkger/service.go +++ b/pkger/service.go @@ -52,7 +52,7 @@ type SVC interface { InitStack(ctx context.Context, userID influxdb.ID, stack Stack) (Stack, error) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, error) - Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, error) + Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, error) } // SVCMiddleware is a service middleware func. @@ -636,6 +636,11 @@ func (s *Service) filterOrgResourceKinds(resourceKindFilters []Kind) []struct { // for later calls to Apply. This func will be run on an Apply if it has not been run // already. func (s *Service) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, error) { + sum, diff, _, err := s.dryRun(ctx, orgID, pkg, opts...) + return sum, diff, err +} + +func (s *Service) dryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, *stateCoordinator, error) { // so here's the deal, when we have issues with the parsing validation, we // continue to do the diff anyhow. any resource that does not have a name // will be skipped, and won't bleed into the dry run here. We can now return @@ -644,7 +649,7 @@ func (s *Service) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pk if !pkg.isParsed { err := pkg.Validate() if err != nil && !IsParseErr(err) { - return Summary{}, Diff{}, internalErr(err) + return Summary{}, Diff{}, nil, internalErr(err) } parseErr = err } @@ -654,75 +659,74 @@ func (s *Service) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pk if len(opt.EnvRefs) > 0 { err := pkg.applyEnvRefs(opt.EnvRefs) if err != nil && !IsParseErr(err) { - return Summary{}, Diff{}, internalErr(err) + return Summary{}, Diff{}, nil, internalErr(err) } parseErr = err } + state := newStateCoordinator(pkg) + if opt.StackID > 0 { - if err := s.addStackPkgState(ctx, opt.StackID, pkg); err != nil { - return Summary{}, Diff{}, internalErr(err) + if err := s.addStackState(ctx, opt.StackID, pkg, state); err != nil { + return Summary{}, Diff{}, nil, internalErr(err) } } if err := s.dryRunSecrets(ctx, orgID, pkg); err != nil { - return Summary{}, Diff{}, err + return Summary{}, Diff{}, nil, err } - diff := Diff{ - Buckets: s.dryRunBuckets(ctx, orgID, pkg), - Checks: s.dryRunChecks(ctx, orgID, pkg), - Dashboards: s.dryRunDashboards(pkg), - Labels: s.dryRunLabels(ctx, orgID, pkg), - Tasks: s.dryRunTasks(pkg), - Telegrafs: s.dryRunTelegraf(pkg), - Variables: s.dryRunVariables(ctx, orgID, pkg), - } + s.dryRunBuckets(ctx, orgID, state.mBuckets) + s.dryRunLabels(ctx, orgID, state.mLabels) + + var diff Diff + diff.Checks = s.dryRunChecks(ctx, orgID, pkg) + diff.Dashboards = s.dryRunDashboards(pkg) + diff.Tasks = s.dryRunTasks(pkg) + diff.Telegrafs = s.dryRunTelegraf(pkg) + diff.Variables = s.dryRunVariables(ctx, orgID, pkg) diffEndpoints, err := s.dryRunNotificationEndpoints(ctx, orgID, pkg) if err != nil { - return Summary{}, Diff{}, err + return Summary{}, Diff{}, nil, err } diff.NotificationEndpoints = diffEndpoints diffRules, err := s.dryRunNotificationRules(ctx, orgID, pkg) if err != nil { - return Summary{}, Diff{}, err + return Summary{}, Diff{}, nil, err } diff.NotificationRules = diffRules - diffLabelMappings, err := s.dryRunLabelMappings(ctx, pkg) + stateLabelMappings, err := s.dryRunLabelMappingsV2(ctx, state) if err != nil { - return Summary{}, Diff{}, err + return Summary{}, Diff{}, nil, err } - diff.LabelMappings = diffLabelMappings + state.labelMappings = stateLabelMappings - // verify the pkg is verified by a dry run. when calling Service.Apply this - // is required to have been run. if it is not true, then apply runs - // the Dry run. - pkg.isVerified = true - return pkg.Summary(), diff, parseErr + stateDiff := state.diff() + + diffLabelMappings, err := s.dryRunLabelMappings(ctx, pkg, state) + if err != nil { + return Summary{}, Diff{}, nil, err + } + diff.LabelMappings = append(diffLabelMappings, diffLabelMappings...) + + diff.Buckets = stateDiff.Buckets + diff.Labels = stateDiff.Labels + diff.LabelMappings = append(stateDiff.LabelMappings, diff.LabelMappings...) + + return pkg.Summary(), diff, state, parseErr } -func (s *Service) dryRunBuckets(ctx context.Context, orgID influxdb.ID, pkg *Pkg) []DiffBucket { - mExistingBkts := make(map[string]DiffBucket) - bkts := pkg.buckets() - for i := range bkts { - b := bkts[i] - existingBkt, _ := s.findBucket(ctx, orgID, b) // ignoring error here - b.existing = existingBkt - mExistingBkts[b.Name()] = newDiffBucket(b, existingBkt) +func (s *Service) dryRunBuckets(ctx context.Context, orgID influxdb.ID, bkts map[string]*stateBucket) { + for _, stateBkt := range bkts { + if stateBkt.ID() != 0 { + stateBkt.existing, _ = s.bucketSVC.FindBucketByID(ctx, stateBkt.ID()) + } else { + stateBkt.existing, _ = s.bucketSVC.FindBucketByName(ctx, orgID, stateBkt.Name()) + } } - - diffs := make([]DiffBucket, 0, len(mExistingBkts)) - for _, diff := range mExistingBkts { - diffs = append(diffs, diff) - } - sort.Slice(diffs, func(i, j int) bool { - return diffs[i].PkgName < diffs[j].PkgName - }) - - return diffs } func (s *Service) dryRunChecks(ctx context.Context, orgID influxdb.ID, pkg *Pkg) []DiffCheck { @@ -756,25 +760,11 @@ func (s *Service) dryRunDashboards(pkg *Pkg) []DiffDashboard { return diffs } -func (s *Service) dryRunLabels(ctx context.Context, orgID influxdb.ID, pkg *Pkg) []DiffLabel { - mExistingLabels := make(map[string]DiffLabel) - labels := pkg.labels() - for i := range labels { - pkgLabel := labels[i] +func (s *Service) dryRunLabels(ctx context.Context, orgID influxdb.ID, labels map[string]*stateLabel) { + for _, pkgLabel := range labels { existingLabel, _ := s.findLabel(ctx, orgID, pkgLabel) pkgLabel.existing = existingLabel - mExistingLabels[pkgLabel.Name()] = newDiffLabel(pkgLabel, existingLabel) } - - diffs := make([]DiffLabel, 0, len(mExistingLabels)) - for _, diff := range mExistingLabels { - diffs = append(diffs, diff) - } - sort.Slice(diffs, func(i, j int) bool { - return diffs[i].PkgName < diffs[j].PkgName - }) - - return diffs } func (s *Service) dryRunNotificationEndpoints(ctx context.Context, orgID influxdb.ID, pkg *Pkg) ([]DiffNotificationEndpoint, error) { @@ -962,7 +952,7 @@ func (s *Service) dryRunVariables(ctx context.Context, orgID influxdb.ID, pkg *P } type ( - labelMappingDiffFn func(labelID influxdb.ID, labelName string, isNew bool) + labelMappingDiffFn func(labelID influxdb.ID, labelPkgName, labelName string, isNew bool) labelMappers interface { Association(i int) labelAssociater @@ -972,15 +962,15 @@ type ( labelAssociater interface { ID() influxdb.ID Name() string + PkgName() string Labels() []*label ResourceType() influxdb.ResourceType Exists() bool } ) -func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg) ([]DiffLabelMapping, error) { +func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg, state *stateCoordinator) ([]DiffLabelMapping, error) { mappers := []labelMappers{ - mapperBuckets(pkg.buckets()), mapperChecks(pkg.checks()), mapperDashboards(pkg.dashboards()), mapperNotificationEndpoints(pkg.notificationEndpoints()), @@ -994,19 +984,26 @@ func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg) ([]DiffLabe for _, mapper := range mappers { for i := 0; i < mapper.Len(); i++ { la := mapper.Association(i) - err := s.dryRunResourceLabelMapping(ctx, la, func(labelID influxdb.ID, labelName string, isNew bool) { - existingLabel, ok := pkg.mLabels[labelName] + err := s.dryRunResourceLabelMapping(ctx, la, func(labelID influxdb.ID, labelPkgName, labelName string, isNew bool) { + existingLabel, ok := state.mLabels[labelName] if !ok { return } existingLabel.setMapping(la, !isNew) + + status := StateStatusExists + if isNew { + status = StateStatusNew + } diffs = append(diffs, DiffLabelMapping{ - IsNew: isNew, - ResType: la.ResourceType(), - ResID: SafeID(la.ID()), - ResName: la.Name(), - LabelID: SafeID(labelID), - LabelName: labelName, + StateStatus: status, + ResType: la.ResourceType(), + ResID: SafeID(la.ID()), + ResPkgName: la.PkgName(), + ResName: la.Name(), + LabelID: SafeID(labelID), + LabelPkgName: labelPkgName, + LabelName: labelName, }) }) if err != nil { @@ -1039,7 +1036,7 @@ func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg) ([]DiffLabe func (s *Service) dryRunResourceLabelMapping(ctx context.Context, la labelAssociater, mappingFn labelMappingDiffFn) error { if !la.Exists() { for _, l := range la.Labels() { - mappingFn(l.ID(), l.Name(), true) + mappingFn(l.ID(), l.PkgName(), l.Name(), true) } return nil } @@ -1059,22 +1056,109 @@ func (s *Service) dryRunResourceLabelMapping(ctx context.Context, la labelAssoci pkgLabels := labelSlcToMap(la.Labels()) for _, l := range existingLabels { - // should ignore any labels that are not specified in pkg - mappingFn(l.ID, l.Name, false) + // if label is found in state then we track the mapping and mark it existing + // otherwise we continue on delete(pkgLabels, l.Name) + if pkgLabel, ok := pkgLabels[l.Name]; ok { + mappingFn(l.ID, pkgLabel.PkgName(), l.Name, false) + } } // now we add labels that were not apart of the existing labels for _, l := range pkgLabels { - mappingFn(l.ID(), l.Name(), true) + mappingFn(l.ID(), l.PkgName(), l.Name(), true) } return nil } -func (s *Service) addStackPkgState(ctx context.Context, stackID influxdb.ID, pkg *Pkg) error { +func (s *Service) dryRunLabelMappingsV2(ctx context.Context, state *stateCoordinator) ([]stateLabelMapping, error) { + stateLabelsByResName := make(map[string]*stateLabel) + for _, l := range state.mLabels { + if l.shouldRemove { + continue + } + stateLabelsByResName[l.Name()] = l + } + + var mappings []stateLabelMapping + for _, b := range state.mBuckets { + if b.shouldRemove { + continue + } + mm, err := s.dryRunResourceLabelMappingV2(ctx, state, stateLabelsByResName, b) + if err != nil { + return nil, err + } + mappings = append(mappings, mm...) + } + + return mappings, nil +} + +func (s *Service) dryRunResourceLabelMappingV2(ctx context.Context, state *stateCoordinator, stateLabelsByResName map[string]*stateLabel, associatedResource interface { + labels() []*label + stateIdentity() stateIdentity +}) ([]stateLabelMapping, error) { + + ident := associatedResource.stateIdentity() + pkgResourceLabels := associatedResource.labels() + + var mappings []stateLabelMapping + if !ident.exists() { + for _, l := range pkgResourceLabels { + mappings = append(mappings, stateLabelMapping{ + status: StateStatusNew, + resource: associatedResource, + label: state.getLabelByPkgName(l.PkgName()), + }) + } + return mappings, nil + } + + existingLabels, err := s.labelSVC.FindResourceLabels(ctx, influxdb.LabelMappingFilter{ + ResourceID: ident.id, + ResourceType: ident.resourceType, + }) + if err != nil { + return nil, err + } + + pkgLabels := labelSlcToMap(pkgResourceLabels) + for _, l := range existingLabels { + // if label is found in state then we track the mapping and mark it existing + // otherwise we continue on + delete(pkgLabels, l.Name) + if sLabel, ok := stateLabelsByResName[l.Name]; ok { + mappings = append(mappings, stateLabelMapping{ + status: StateStatusExists, + resource: associatedResource, + label: sLabel, + }) + } + } + + // now we add labels that do not exist + for _, l := range pkgLabels { + mappings = append(mappings, stateLabelMapping{ + status: StateStatusNew, + resource: associatedResource, + label: state.getLabelByPkgName(l.PkgName()), + }) + } + + return mappings, nil +} + +func (s *Service) addStackState(ctx context.Context, stackID influxdb.ID, pkg *Pkg, state *stateCoordinator) error { stack, err := s.store.ReadStackByID(ctx, stackID) if err != nil { - return internalErr(err) + return ierrors.Wrap(internalErr(err), "reading stack") + } + + type stateMapper interface { + Contains(kind Kind, pkgName string) bool + setObjectID(kind Kind, pkgName string, id influxdb.ID) + addObjectForRemoval(kind Kind, pkgName string, id influxdb.ID) } // check resource exists in pkg @@ -1083,9 +1167,16 @@ func (s *Service) addStackPkgState(ctx context.Context, stackID influxdb.ID, pkg // else // add stub pkg resource that indicates it should be deleted for _, r := range stack.Resources { - updateFn := pkg.setObjectID - if !pkg.Contains(r.Kind, r.Name) { - updateFn = pkg.addObjectForRemoval + var mapper stateMapper = pkg + if r.Kind.is(KindBucket, KindLabel) { + // hack for time being while we transition state out of pkg. + // this will take several passes to finish up. + mapper = state + } + + updateFn := mapper.setObjectID + if !mapper.Contains(r.Kind, r.Name) { + updateFn = mapper.addObjectForRemoval } updateFn(r.Kind, r.Name, r.ID) } @@ -1135,23 +1226,22 @@ func applyOptFromOptFns(opts ...ApplyOptFn) ApplyOpt { // Apply will apply all the resources identified in the provided pkg. The entire pkg will be applied // in its entirety. If a failure happens midway then the entire pkg will be rolled back to the state // from before the pkg were applied. -func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (sum Summary, e error) { +func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (sum Summary, diff Diff, e error) { if !pkg.isParsed { if err := pkg.Validate(); err != nil { - return Summary{}, failedValidationErr(err) + return Summary{}, Diff{}, failedValidationErr(err) } } opt := applyOptFromOptFns(opts...) if err := pkg.applyEnvRefs(opt.EnvRefs); err != nil { - return Summary{}, failedValidationErr(err) + return Summary{}, Diff{}, failedValidationErr(err) } - if !pkg.isVerified { - if _, _, err := s.DryRun(ctx, orgID, userID, pkg, opts...); err != nil { - return Summary{}, err - } + _, diff, state, err := s.dryRun(ctx, orgID, pkg, opts...) + if err != nil { + return Summary{}, Diff{}, err } defer func() { @@ -1164,7 +1254,7 @@ func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg if e != nil { updateStackFn = s.updateStackAfterRollback } - if err := updateStackFn(ctx, stackID, pkg); err != nil { + if err := updateStackFn(ctx, stackID, pkg, state); err != nil { s.log.Error("failed to update stack", zap.Error(err)) } }() @@ -1172,15 +1262,15 @@ func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg coordinator := &rollbackCoordinator{sem: make(chan struct{}, s.applyReqLimit)} defer coordinator.rollback(s.log, &e, orgID) - sum, err := s.apply(ctx, coordinator, orgID, userID, pkg, opt.MissingSecrets) + sum, err = s.apply(ctx, coordinator, orgID, userID, pkg, state, opt.MissingSecrets) if err != nil { - return Summary{}, err + return Summary{}, Diff{}, err } - return sum, nil + return sum, diff, err } -func (s *Service) apply(ctx context.Context, coordinator *rollbackCoordinator, orgID, userID influxdb.ID, pkg *Pkg, missingSecrets map[string]string) (sum Summary, e error) { +func (s *Service) apply(ctx context.Context, coordinator *rollbackCoordinator, orgID, userID influxdb.ID, pkg *Pkg, state *stateCoordinator, missingSecrets map[string]string) (sum Summary, e error) { // each grouping here runs for its entirety, then returns an error that // is indicative of running all appliers provided. For instance, the labels // may have 1 variable fail and one of the buckets fails. The errors aggregate so @@ -1198,12 +1288,12 @@ func (s *Service) apply(ctx context.Context, coordinator *rollbackCoordinator, o }, { // deps for primary resources - s.applyLabels(ctx, pkg.labels()), + s.applyLabels(ctx, state.labels()), }, { // primary resources, can have relationships to labels s.applyVariables(ctx, pkg.variables()), - s.applyBuckets(ctx, pkg.buckets()), + s.applyBuckets(ctx, state.buckets()), s.applyChecks(ctx, pkg.checks()), s.applyDashboards(pkg.dashboards()), s.applyNotificationEndpoints(ctx, userID, pkg.notificationEndpoints()), @@ -1230,27 +1320,46 @@ func (s *Service) apply(ctx context.Context, coordinator *rollbackCoordinator, o // secondary resources // this last grouping relies on the above 2 steps having completely successfully - secondary := []applier{s.applyLabelMappings(pkg.labelMappings())} + secondary := []applier{ + s.applyLabelMappings(pkg.labelMappings()), + s.applyLabelMappingsV2(state.labelMappings), + } if err := coordinator.runTilEnd(ctx, orgID, userID, secondary...); err != nil { return Summary{}, internalErr(err) } pkg.applySecrets(missingSecrets) - return pkg.Summary(), nil + stateSum := state.summary() + + pkgSum := pkg.Summary() + pkgSum.Buckets = stateSum.Buckets + pkgSum.Labels = stateSum.Labels + + // filter out label mappings that are from pgk and replace with those + // in state. This is temporary hack to provide a bridge to the promise land... + for _, lm := range pkgSum.LabelMappings { + if lm.ResourceType == influxdb.BucketsResourceType { + continue + } + stateSum.LabelMappings = append(stateSum.LabelMappings, lm) + } + pkgSum.LabelMappings = stateSum.LabelMappings + + return pkgSum, nil } -func (s *Service) applyBuckets(ctx context.Context, buckets []*bucket) applier { +func (s *Service) applyBuckets(ctx context.Context, buckets []*stateBucket) applier { const resource = "bucket" mutex := new(doMutex) - rollbackBuckets := make([]*bucket, 0, len(buckets)) + rollbackBuckets := make([]*stateBucket, 0, len(buckets)) createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody { - var b bucket + var b *stateBucket mutex.Do(func() { - buckets[i].OrgID = orgID - b = *buckets[i] + buckets[i].orgID = orgID + b = buckets[i] }) if !b.shouldApply() { return nil @@ -1284,20 +1393,21 @@ func (s *Service) applyBuckets(ctx context.Context, buckets []*bucket) applier { } } -func (s *Service) rollbackBuckets(ctx context.Context, buckets []*bucket) error { - rollbackFn := func(b *bucket) error { +func (s *Service) rollbackBuckets(ctx context.Context, buckets []*stateBucket) error { + rollbackFn := func(b *stateBucket) error { var err error switch { case b.shouldRemove: - err = s.bucketSVC.CreateBucket(ctx, b.existing) + err = ierrors.Wrap(s.bucketSVC.CreateBucket(ctx, b.existing), "rolling back removed bucket") case b.existing == nil: - err = s.bucketSVC.DeleteBucket(ctx, b.ID()) + err = ierrors.Wrap(s.bucketSVC.DeleteBucket(ctx, b.ID()), "rolling back new bucket") default: rp := b.RetentionRules.RP() _, err = s.bucketSVC.UpdateBucket(ctx, b.ID(), influxdb.BucketUpdate{ Description: &b.Description, RetentionPeriod: &rp, }) + err = ierrors.Wrap(err, "rolling back existing bucket to previous state") } return err } @@ -1317,7 +1427,7 @@ func (s *Service) rollbackBuckets(ctx context.Context, buckets []*bucket) error return nil } -func (s *Service) applyBucket(ctx context.Context, b bucket) (influxdb.Bucket, error) { +func (s *Service) applyBucket(ctx context.Context, b *stateBucket) (influxdb.Bucket, error) { if b.shouldRemove { if err := s.bucketSVC.DeleteBucket(ctx, b.ID()); err != nil { return influxdb.Bucket{}, fmt.Errorf("failed to delete bucket[%q]: %w", b.ID(), err) @@ -1340,7 +1450,7 @@ func (s *Service) applyBucket(ctx context.Context, b bucket) (influxdb.Bucket, e } influxBucket := influxdb.Bucket{ - OrgID: b.OrgID, + OrgID: b.orgID, Description: b.Description, Name: b.Name(), RetentionPeriod: rp, @@ -1542,17 +1652,17 @@ func convertChartsToCells(ch []chart) []*influxdb.Cell { return icells } -func (s *Service) applyLabels(ctx context.Context, labels []*label) applier { +func (s *Service) applyLabels(ctx context.Context, labels []*stateLabel) applier { const resource = "label" mutex := new(doMutex) - rollBackLabels := make([]*label, 0, len(labels)) + rollBackLabels := make([]*stateLabel, 0, len(labels)) createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody { - var l label + var l *stateLabel mutex.Do(func() { - labels[i].OrgID = orgID - l = *labels[i] + labels[i].orgID = orgID + l = labels[i] }) if !l.shouldApply() { return nil @@ -1568,6 +1678,7 @@ func (s *Service) applyLabels(ctx context.Context, labels []*label) applier { mutex.Do(func() { labels[i].id = influxLabel.ID + labels[i].label.id = influxLabel.ID rollBackLabels = append(rollBackLabels, labels[i]) }) @@ -1586,8 +1697,8 @@ func (s *Service) applyLabels(ctx context.Context, labels []*label) applier { } } -func (s *Service) rollbackLabels(ctx context.Context, labels []*label) error { - rollbackFn := func(l *label) error { +func (s *Service) rollbackLabels(ctx context.Context, labels []*stateLabel) error { + rollbackFn := func(l *stateLabel) error { var err error switch { case l.shouldRemove: @@ -1617,32 +1728,30 @@ func (s *Service) rollbackLabels(ctx context.Context, labels []*label) error { return nil } -func (s *Service) applyLabel(ctx context.Context, l label) (influxdb.Label, error) { - if l.shouldRemove { - if err := s.labelSVC.DeleteLabel(ctx, l.ID()); err != nil { - return influxdb.Label{}, fmt.Errorf("failed to delete label[%q]: %w", l.ID(), err) - } - return *l.existing, nil - } - - if l.existing != nil { - updatedlabel, err := s.labelSVC.UpdateLabel(ctx, l.ID(), influxdb.LabelUpdate{ +func (s *Service) applyLabel(ctx context.Context, l *stateLabel) (influxdb.Label, error) { + var ( + influxLabel *influxdb.Label + err error + ) + switch { + case l.shouldRemove: + influxLabel, err = l.existing, s.labelSVC.DeleteLabel(ctx, l.ID()) + case l.Exists(): + influxLabel, err = s.labelSVC.UpdateLabel(ctx, l.ID(), influxdb.LabelUpdate{ Name: l.Name(), Properties: l.properties(), }) - if err != nil { - return influxdb.Label{}, err - } - return *updatedlabel, nil + err = ierrors.Wrap(err, "updating") + default: + creatLabel := l.toInfluxLabel() + influxLabel = &creatLabel + err = ierrors.Wrap(s.labelSVC.CreateLabel(ctx, &creatLabel), "creating") } - - influxLabel := l.toInfluxLabel() - err := s.labelSVC.CreateLabel(ctx, &influxLabel) - if err != nil { + if err != nil || influxLabel == nil { return influxdb.Label{}, err } - return influxLabel, nil + return *influxLabel, nil } func (s *Service) applyNotificationEndpoints(ctx context.Context, userID influxdb.ID, endpoints []*notificationEndpoint) applier { @@ -2141,6 +2250,78 @@ func (s *Service) applyVariable(ctx context.Context, v variable) (influxdb.Varia return influxVar, nil } +func (s *Service) applyLabelMappingsV2(labelMappings []stateLabelMapping) applier { + const resource = "label_mapping" + + mutex := new(doMutex) + rollbackMappings := make([]stateLabelMapping, 0, len(labelMappings)) + + createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody { + var mapping stateLabelMapping + mutex.Do(func() { + mapping = labelMappings[i] + }) + + ident := mapping.resource.stateIdentity() + if exists(mapping.status) || mapping.label.ID() == 0 || ident.id == 0 { + // this block here does 2 things, it does not write a + // mapping when one exists. it also avoids having to worry + // about deleting an existing mapping since it will not be + // passed to the delete function below b/c it is never added + // to the list of mappings that is referenced in the delete + // call. + return nil + } + + m := influxdb.LabelMapping{ + LabelID: mapping.label.ID(), + ResourceID: ident.id, + ResourceType: ident.resourceType, + } + err := s.labelSVC.CreateLabelMapping(ctx, &m) + if err != nil { + return &applyErrBody{ + name: fmt.Sprintf("%s:%s:%s", ident.resourceType, ident.id, mapping.label.ID()), + msg: err.Error(), + } + } + + mutex.Do(func() { + rollbackMappings = append(rollbackMappings, mapping) + }) + + return nil + } + + return applier{ + creater: creater{ + entries: len(labelMappings), + fn: createFn, + }, + rollbacker: rollbacker{ + resource: resource, + fn: func(_ influxdb.ID) error { return s.rollbackLabelMappingsV2(rollbackMappings) }, + }, + } +} + +func (s *Service) rollbackLabelMappingsV2(mappings []stateLabelMapping) error { + var errs []string + for _, stateMapping := range mappings { + influxMapping := stateLabelMappingToInfluxLabelMapping(stateMapping) + err := s.labelSVC.DeleteLabelMapping(context.Background(), &influxMapping) + if err != nil { + errs = append(errs, fmt.Sprintf("%s:%s", stateMapping.label.ID(), stateMapping.resource.stateIdentity().id)) + } + } + + if len(errs) > 0 { + return fmt.Errorf(`label_resource_id_pairs=[%s] err="unable to delete label"`, strings.Join(errs, ", ")) + } + + return nil +} + func (s *Service) applyLabelMappings(labelMappings []SummaryLabelMapping) applier { const resource = "label_mapping" @@ -2152,7 +2333,8 @@ func (s *Service) applyLabelMappings(labelMappings []SummaryLabelMapping) applie mutex.Do(func() { mapping = labelMappings[i] }) - if mapping.exists || mapping.LabelID == 0 || mapping.ResourceID == 0 { + + if exists(mapping.Status) || mapping.LabelID == 0 || mapping.ResourceID == 0 { // this block here does 2 things, it does not write a // mapping when one exists. it also avoids having to worry // about deleting an existing mapping since it will not be @@ -2228,14 +2410,14 @@ func (s *Service) deleteByIDs(resource string, numIDs int, deleteFn func(context return nil } -func (s *Service) updateStackAfterSuccess(ctx context.Context, stackID influxdb.ID, pkg *Pkg) error { +func (s *Service) updateStackAfterSuccess(ctx context.Context, stackID influxdb.ID, pkg *Pkg, state *stateCoordinator) error { stack, err := s.store.ReadStackByID(ctx, stackID) if err != nil { return err } var stackResources []StackResource - for _, b := range pkg.buckets() { + for _, b := range state.mBuckets { if b.shouldRemove { continue } @@ -2268,7 +2450,7 @@ func (s *Service) updateStackAfterSuccess(ctx context.Context, stackID influxdb. Name: n.PkgName(), }) } - for _, l := range pkg.labels() { + for _, l := range state.mLabels { if l.shouldRemove { continue } @@ -2296,7 +2478,7 @@ func (s *Service) updateStackAfterSuccess(ctx context.Context, stackID influxdb. return s.store.UpdateStack(ctx, stack) } -func (s *Service) updateStackAfterRollback(ctx context.Context, stackID influxdb.ID, pkg *Pkg) error { +func (s *Service) updateStackAfterRollback(ctx context.Context, stackID influxdb.ID, pkg *Pkg, state *stateCoordinator) error { stack, err := s.store.ReadStackByID(ctx, stackID) if err != nil { return err @@ -2321,7 +2503,7 @@ func (s *Service) updateStackAfterRollback(ctx context.Context, stackID influxdb // these are the case where a deletion happens and is rolled back creating a new resource. // when resource is not to be removed this is a nothing burger, as it should be // rolled back to previous state. - for _, b := range pkg.buckets() { + for _, b := range state.mBuckets { if b.shouldRemove { res := existingResources[newKey(KindBucket, b.PkgName())] if res.ID != b.ID() { @@ -2348,7 +2530,7 @@ func (s *Service) updateStackAfterRollback(ctx context.Context, stackID influxdb } } } - for _, l := range pkg.labels() { + for _, l := range state.mLabels { if l.shouldRemove { res := existingResources[newKey(KindLabel, l.PkgName())] if res.ID != l.ID() { @@ -2375,14 +2557,6 @@ func (s *Service) updateStackAfterRollback(ctx context.Context, stackID influxdb return s.store.UpdateStack(ctx, stack) } -func (s *Service) findBucket(ctx context.Context, orgID influxdb.ID, b *bucket) (*influxdb.Bucket, error) { - if b.ID() != 0 { - return s.bucketSVC.FindBucketByID(ctx, b.ID()) - } - - return s.bucketSVC.FindBucketByName(ctx, orgID, b.Name()) -} - func (s *Service) findCheck(ctx context.Context, orgID influxdb.ID, c *check) (influxdb.Check, error) { if c.ID() != 0 { return s.checkSVC.FindCheckByID(ctx, c.ID()) @@ -2395,7 +2569,7 @@ func (s *Service) findCheck(ctx context.Context, orgID influxdb.ID, c *check) (i }) } -func (s *Service) findLabel(ctx context.Context, orgID influxdb.ID, l *label) (*influxdb.Label, error) { +func (s *Service) findLabel(ctx context.Context, orgID influxdb.ID, l *stateLabel) (*influxdb.Label, error) { if l.ID() != 0 { return s.labelSVC.FindLabelByID(ctx, l.ID()) } diff --git a/pkger/service_auth.go b/pkger/service_auth.go index 5e908ac3be..e257a6e439 100644 --- a/pkger/service_auth.go +++ b/pkger/service_auth.go @@ -44,6 +44,6 @@ func (s *authMW) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg return s.next.DryRun(ctx, orgID, userID, pkg, opts...) } -func (s *authMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, error) { +func (s *authMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, error) { return s.next.Apply(ctx, orgID, userID, pkg, opts...) } diff --git a/pkger/service_logging.go b/pkger/service_logging.go index f52a3388e5..4270ff1ecc 100644 --- a/pkger/service_logging.go +++ b/pkger/service_logging.go @@ -83,7 +83,7 @@ func (s *loggingMW) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg * return s.next.DryRun(ctx, orgID, userID, pkg, opts...) } -func (s *loggingMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (sum Summary, err error) { +func (s *loggingMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (sum Summary, diff Diff, err error) { defer func(start time.Time) { dur := zap.Duration("took", time.Since(start)) if err != nil { diff --git a/pkger/service_metrics.go b/pkger/service_metrics.go index e0e504e774..49fcd1dccc 100644 --- a/pkger/service_metrics.go +++ b/pkger/service_metrics.go @@ -45,8 +45,8 @@ func (s *mwMetrics) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg * return sum, diff, rec(err) } -func (s *mwMetrics) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, error) { +func (s *mwMetrics) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (Summary, Diff, error) { rec := s.rec.Record("apply") - sum, err := s.next.Apply(ctx, orgID, userID, pkg, opts...) - return sum, rec(err) + sum, diff, err := s.next.Apply(ctx, orgID, userID, pkg, opts...) + return sum, diff, rec(err) } diff --git a/pkger/service_models.go b/pkger/service_models.go new file mode 100644 index 0000000000..3e461e0660 --- /dev/null +++ b/pkger/service_models.go @@ -0,0 +1,409 @@ +package pkger + +import ( + "sort" + + "github.com/influxdata/influxdb/v2" +) + +type stateCoordinator struct { + mBuckets map[string]*stateBucket + + mLabels map[string]*stateLabel + + labelMappings []stateLabelMapping +} + +func newStateCoordinator(pkg *Pkg) *stateCoordinator { + state := stateCoordinator{ + mBuckets: make(map[string]*stateBucket), + mLabels: make(map[string]*stateLabel), + } + + for _, pkgBkt := range pkg.buckets() { + state.mBuckets[pkgBkt.PkgName()] = &stateBucket{ + bucket: pkgBkt, + } + } + + for _, pkgLabel := range pkg.labels() { + state.mLabels[pkgLabel.PkgName()] = &stateLabel{ + label: pkgLabel, + } + } + + return &state +} + +func (s *stateCoordinator) buckets() []*stateBucket { + out := make([]*stateBucket, 0, len(s.mBuckets)) + for _, v := range s.mBuckets { + out = append(out, v) + } + return out +} + +func (s *stateCoordinator) labels() []*stateLabel { + out := make([]*stateLabel, 0, len(s.mLabels)) + for _, v := range s.mLabels { + out = append(out, v) + } + return out +} + +func (s *stateCoordinator) diff() Diff { + var diff Diff + for _, b := range s.mBuckets { + diff.Buckets = append(diff.Buckets, b.diffBucket()) + } + sort.Slice(diff.Buckets, func(i, j int) bool { + return diff.Buckets[i].PkgName < diff.Buckets[j].PkgName + }) + + for _, l := range s.mLabels { + diff.Labels = append(diff.Labels, l.diffLabel()) + } + sort.Slice(diff.Labels, func(i, j int) bool { + return diff.Labels[i].PkgName < diff.Labels[j].PkgName + }) + + for _, m := range s.labelMappings { + diff.LabelMappings = append(diff.LabelMappings, m.diffLabelMapping()) + } + sort.Slice(diff.LabelMappings, func(i, j int) bool { + n, m := diff.LabelMappings[i], diff.LabelMappings[j] + if n.ResType < m.ResType { + return true + } + if n.ResType > m.ResType { + return false + } + if n.ResPkgName < m.ResPkgName { + return true + } + if n.ResPkgName > m.ResPkgName { + return false + } + return n.LabelName < m.LabelName + }) + + return diff +} + +func (s *stateCoordinator) summary() Summary { + var sum Summary + for _, b := range s.buckets() { + if b.shouldRemove { + continue + } + sum.Buckets = append(sum.Buckets, b.summarize()) + } + sort.Slice(sum.Buckets, func(i, j int) bool { + return sum.Buckets[i].PkgName < sum.Buckets[j].PkgName + }) + + for _, l := range s.labels() { + if l.shouldRemove { + continue + } + sum.Labels = append(sum.Labels, l.summarize()) + } + sort.Slice(sum.Labels, func(i, j int) bool { + return sum.Labels[i].PkgName < sum.Labels[j].PkgName + }) + + for _, m := range s.labelMappings { + sum.LabelMappings = append(sum.LabelMappings, m.summarize()) + } + sort.Slice(sum.LabelMappings, func(i, j int) bool { + n, m := sum.LabelMappings[i], sum.LabelMappings[j] + if n.ResourceType != m.ResourceType { + return n.ResourceType < m.ResourceType + } + if n.ResourcePkgName != m.ResourcePkgName { + return n.ResourcePkgName < m.ResourcePkgName + } + return n.LabelName < m.LabelName + }) + + return sum +} + +func (s *stateCoordinator) getLabelByPkgName(pkgName string) *stateLabel { + return s.mLabels[pkgName] +} + +func (s *stateCoordinator) Contains(k Kind, pkgName string) bool { + _, ok := s.getObjectIDSetter(k, pkgName) + return ok +} + +// setObjectID sets the id for the resource graphed from the object the key identifies. +func (s *stateCoordinator) setObjectID(k Kind, pkgName string, id influxdb.ID) { + idSetFn, ok := s.getObjectIDSetter(k, pkgName) + if !ok { + return + } + idSetFn(id) +} + +// setObjectID sets the id for the resource graphed from the object the key identifies. +// The pkgName and kind are used as the unique identifier, when calling this it will +// overwrite any existing value if one exists. If desired, check for the value by using +// the Contains method. +func (s *stateCoordinator) addObjectForRemoval(k Kind, pkgName string, id influxdb.ID) { + newIdentity := identity{ + name: &references{val: pkgName}, + } + + switch k { + case KindBucket: + s.mBuckets[pkgName] = &stateBucket{ + id: id, + bucket: &bucket{identity: newIdentity}, + shouldRemove: true, + } + case KindLabel: + s.mLabels[pkgName] = &stateLabel{ + id: id, + label: &label{identity: newIdentity}, + shouldRemove: true, + } + } +} + +func (s *stateCoordinator) getObjectIDSetter(k Kind, pkgName string) (func(influxdb.ID), bool) { + switch k { + case KindBucket: + r, ok := s.mBuckets[pkgName] + return func(id influxdb.ID) { + r.id = id + }, ok + case KindLabel: + r, ok := s.mLabels[pkgName] + return func(id influxdb.ID) { + r.id = id + }, ok + default: + return nil, false + } +} + +type stateIdentity struct { + id influxdb.ID + name string + pkgName string + resourceType influxdb.ResourceType + shouldRemove bool +} + +func (s stateIdentity) exists() bool { + return s.id != 0 +} + +type stateBucket struct { + id, orgID influxdb.ID + shouldRemove bool + + existing *influxdb.Bucket + + *bucket +} + +func (b *stateBucket) diffBucket() DiffBucket { + diff := DiffBucket{ + DiffIdentifier: DiffIdentifier{ + ID: SafeID(b.ID()), + Remove: b.shouldRemove, + PkgName: b.PkgName(), + }, + New: DiffBucketValues{ + Name: b.Name(), + Description: b.Description, + RetentionRules: b.RetentionRules, + }, + } + if e := b.existing; e != nil { + diff.Old = &DiffBucketValues{ + Name: e.Name, + Description: e.Description, + } + if e.RetentionPeriod > 0 { + diff.Old.RetentionRules = retentionRules{newRetentionRule(e.RetentionPeriod)} + } + } + return diff +} + +func (b *stateBucket) summarize() SummaryBucket { + sum := b.bucket.summarize() + sum.ID = SafeID(b.ID()) + sum.OrgID = SafeID(b.orgID) + return sum +} + +func (b *stateBucket) Exists() bool { + return b.existing != nil +} + +func (b *stateBucket) ID() influxdb.ID { + if b.Exists() { + return b.existing.ID + } + return b.id +} + +func (b *stateBucket) resourceType() influxdb.ResourceType { + return KindBucket.ResourceType() +} + +func (b *stateBucket) labels() []*label { + return b.bucket.labels +} + +func (b *stateBucket) stateIdentity() stateIdentity { + return stateIdentity{ + id: b.ID(), + name: b.Name(), + pkgName: b.PkgName(), + resourceType: b.resourceType(), + shouldRemove: b.shouldRemove, + } +} + +func (b *stateBucket) shouldApply() bool { + return b.shouldRemove || + b.existing == nil || + b.Description != b.existing.Description || + b.Name() != b.existing.Name || + b.RetentionRules.RP() != b.existing.RetentionPeriod +} + +type stateLabel struct { + id, orgID influxdb.ID + shouldRemove bool + + existing *influxdb.Label + + *label +} + +func (l *stateLabel) diffLabel() DiffLabel { + diff := DiffLabel{ + DiffIdentifier: DiffIdentifier{ + ID: SafeID(l.ID()), + Remove: l.shouldRemove, + PkgName: l.PkgName(), + }, + New: DiffLabelValues{ + Name: l.Name(), + Description: l.Description, + Color: l.Color, + }, + } + if e := l.existing; e != nil { + diff.Old = &DiffLabelValues{ + Name: e.Name, + Description: e.Properties["description"], + Color: e.Properties["color"], + } + } + return diff +} + +func (l *stateLabel) summarize() SummaryLabel { + sum := l.label.summarize() + sum.ID = SafeID(l.ID()) + sum.OrgID = SafeID(l.orgID) + return sum +} + +func (l *stateLabel) Exists() bool { + return l.existing != nil +} + +func (l *stateLabel) ID() influxdb.ID { + if l.Exists() { + return l.existing.ID + } + return l.id +} + +func (l *stateLabel) shouldApply() bool { + return l.existing == nil || + l.Description != l.existing.Properties["description"] || + l.Name() != l.existing.Name || + l.Color != l.existing.Properties["color"] +} + +func (l *stateLabel) toInfluxLabel() influxdb.Label { + return influxdb.Label{ + ID: l.ID(), + OrgID: l.orgID, + Name: l.Name(), + Properties: l.properties(), + } +} + +func (l *stateLabel) properties() map[string]string { + return map[string]string{ + "color": l.Color, + "description": l.Description, + } +} + +type stateLabelMapping struct { + status StateStatus + + resource interface { + stateIdentity() stateIdentity + } + + label *stateLabel +} + +func (lm stateLabelMapping) diffLabelMapping() DiffLabelMapping { + ident := lm.resource.stateIdentity() + return DiffLabelMapping{ + StateStatus: lm.status, + ResType: ident.resourceType, + ResID: SafeID(ident.id), + ResPkgName: ident.pkgName, + ResName: ident.name, + LabelID: SafeID(lm.label.ID()), + LabelPkgName: lm.label.PkgName(), + LabelName: lm.label.Name(), + } +} + +func (lm stateLabelMapping) summarize() SummaryLabelMapping { + ident := lm.resource.stateIdentity() + return SummaryLabelMapping{ + Status: lm.status, + ResourceID: SafeID(ident.id), + ResourcePkgName: ident.pkgName, + ResourceName: ident.name, + ResourceType: ident.resourceType, + LabelPkgName: lm.label.PkgName(), + LabelName: lm.label.Name(), + LabelID: SafeID(lm.label.ID()), + } +} + +func stateLabelMappingToInfluxLabelMapping(mapping stateLabelMapping) influxdb.LabelMapping { + ident := mapping.resource.stateIdentity() + return influxdb.LabelMapping{ + LabelID: mapping.label.ID(), + ResourceID: ident.id, + ResourceType: ident.resourceType, + } +} + +// IsNew identifies state status as new to the platform. +func IsNew(status StateStatus) bool { + return status == StateStatusNew +} + +func exists(status StateStatus) bool { + return status == StateStatusExists +} diff --git a/pkger/service_test.go b/pkger/service_test.go index 5c8e880d39..359813e0dc 100644 --- a/pkger/service_test.go +++ b/pkger/service_test.go @@ -474,7 +474,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - sum, err := svc.Apply(context.TODO(), orgID, 0, pkg) + sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.NoError(t, err) require.Len(t, sum.Buckets, 2) @@ -496,29 +496,35 @@ func TestService(t *testing.T) { testfileRunner(t, "testdata/bucket.yml", func(t *testing.T, pkg *Pkg) { orgID := influxdb.ID(9000) - pkg.isVerified = true - stubExisting := func(name string, id influxdb.ID) { - pkgBkt := pkg.mBuckets[name] - pkgBkt.existing = &influxdb.Bucket{ - // makes all pkg changes same as they are on thes existing bucket - ID: id, - OrgID: orgID, - Name: pkgBkt.Name(), - Description: pkgBkt.Description, - RetentionPeriod: pkgBkt.RetentionRules.RP(), - } - } - stubExisting("rucket_11", 3) - stubExisting("rucket_22", 4) - fakeBktSVC := mock.NewBucketService() + fakeBktSVC.FindBucketByNameFn = func(ctx context.Context, oid influxdb.ID, name string) (*influxdb.Bucket, error) { + if orgID != oid { + return nil, errors.New("invalid org id") + } + + id := influxdb.ID(3) + if name == "display name" { + id = 4 + name = "rucket_22" + } + if bkt, ok := pkg.mBuckets[name]; ok { + return &influxdb.Bucket{ + ID: id, + OrgID: oid, + Name: bkt.Name(), + Description: bkt.Description, + RetentionPeriod: bkt.RetentionRules.RP(), + }, nil + } + return nil, errors.New("not found") + } fakeBktSVC.UpdateBucketFn = func(_ context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) { return &influxdb.Bucket{ID: id}, nil } svc := newTestService(WithBucketSVC(fakeBktSVC)) - sum, err := svc.Apply(context.TODO(), orgID, 0, pkg) + sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.NoError(t, err) require.Len(t, sum.Buckets, 2) @@ -559,7 +565,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - _, err := svc.Apply(context.TODO(), orgID, 0, pkg) + _, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.Error(t, err) assert.GreaterOrEqual(t, fakeBktSVC.DeleteBucketCalls.Count(), 1) @@ -580,7 +586,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - sum, err := svc.Apply(context.TODO(), orgID, 0, pkg) + sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.NoError(t, err) require.Len(t, sum.Checks, 2) @@ -626,7 +632,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - _, err := svc.Apply(context.TODO(), orgID, 0, pkg) + _, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.Error(t, err) assert.GreaterOrEqual(t, fakeCheckSVC.DeleteCheckCalls.Count(), 1) @@ -651,7 +657,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - sum, err := svc.Apply(context.TODO(), orgID, 0, pkg) + sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.NoError(t, err) require.Len(t, sum.Labels, 3) @@ -704,7 +710,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - _, err := svc.Apply(context.TODO(), orgID, 0, pkg) + _, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.Error(t, err) assert.GreaterOrEqual(t, fakeLabelSVC.DeleteLabelCalls.Count(), 1) @@ -712,13 +718,12 @@ func TestService(t *testing.T) { }) t.Run("will not apply label if no changes to be applied", func(t *testing.T) { - testfileRunner(t, "testdata/label", func(t *testing.T, pkg *Pkg) { + testfileRunner(t, "testdata/label.yml", func(t *testing.T, pkg *Pkg) { orgID := influxdb.ID(9000) - pkg.isVerified = true - stubExisting := func(name string, id influxdb.ID) { + stubExisting := func(name string, id influxdb.ID) *influxdb.Label { pkgLabel := pkg.mLabels[name] - pkgLabel.existing = &influxdb.Label{ + return &influxdb.Label{ // makes all pkg changes same as they are on the existing ID: id, OrgID: orgID, @@ -733,6 +738,18 @@ func TestService(t *testing.T) { stubExisting("label_3", 3) fakeLabelSVC := mock.NewLabelService() + fakeLabelSVC.FindLabelsFn = func(ctx context.Context, f influxdb.LabelFilter) ([]*influxdb.Label, error) { + if f.Name != "label_1" && f.Name != "display name" { + return nil, nil + } + id := influxdb.ID(1) + name := f.Name + if f.Name == "display name" { + id = 3 + name = "label_3" + } + return []*influxdb.Label{stubExisting(name, id)}, nil + } fakeLabelSVC.CreateLabelFn = func(_ context.Context, l *influxdb.Label) error { if l.Name == "label_2" { l.ID = 2 @@ -748,7 +765,7 @@ func TestService(t *testing.T) { svc := newTestService(WithLabelSVC(fakeLabelSVC)) - sum, err := svc.Apply(context.TODO(), orgID, 0, pkg) + sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.NoError(t, err) require.Len(t, sum.Labels, 3) @@ -802,7 +819,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - sum, err := svc.Apply(context.TODO(), orgID, 0, pkg) + sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.NoError(t, err) require.Len(t, sum.Dashboards, 1) @@ -837,7 +854,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - _, err := svc.Apply(context.TODO(), orgID, 0, pkg) + _, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.Error(t, err) assert.True(t, deletedDashs[1]) @@ -846,9 +863,83 @@ func TestService(t *testing.T) { }) t.Run("label mapping", func(t *testing.T) { + testLabelMappingV2ApplyFn := func(t *testing.T, filename string, numExpected int, settersFn func() []ServiceSetterFn) { + testfileRunner(t, filename, func(t *testing.T, pkg *Pkg) { + t.Helper() + + fakeLabelSVC := mock.NewLabelService() + fakeLabelSVC.CreateLabelFn = func(_ context.Context, l *influxdb.Label) error { + l.ID = influxdb.ID(rand.Int()) + return nil + } + fakeLabelSVC.CreateLabelMappingFn = func(_ context.Context, mapping *influxdb.LabelMapping) error { + if mapping.ResourceID == 0 { + return errors.New("did not get a resource ID") + } + if mapping.ResourceType == "" { + return errors.New("did not get a resource type") + } + return nil + } + svc := newTestService(append(settersFn(), + WithLabelSVC(fakeLabelSVC), + WithLogger(zaptest.NewLogger(t)), + )...) + + orgID := influxdb.ID(9000) + + _, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) + require.NoError(t, err) + + assert.Equal(t, numExpected, fakeLabelSVC.CreateLabelMappingCalls.Count()) + }) + } + + testLabelMappingV2RollbackFn := func(t *testing.T, filename string, killCount int, settersFn func() []ServiceSetterFn) { + testfileRunner(t, filename, func(t *testing.T, pkg *Pkg) { + t.Helper() + + fakeLabelSVC := mock.NewLabelService() + fakeLabelSVC.CreateLabelFn = func(_ context.Context, l *influxdb.Label) error { + l.ID = influxdb.ID(fakeLabelSVC.CreateLabelCalls.Count() + 1) + return nil + } + fakeLabelSVC.DeleteLabelMappingFn = func(_ context.Context, m *influxdb.LabelMapping) error { + t.Logf("delete: %+v", m) + return nil + } + fakeLabelSVC.CreateLabelMappingFn = func(_ context.Context, mapping *influxdb.LabelMapping) error { + t.Logf("create: %+v", mapping) + if mapping.ResourceID == 0 { + return errors.New("did not get a resource ID") + } + if mapping.ResourceType == "" { + return errors.New("did not get a resource type") + } + if fakeLabelSVC.CreateLabelMappingCalls.Count() == killCount { + return errors.New("hit last label") + } + return nil + } + svc := newTestService(append(settersFn(), + WithLabelSVC(fakeLabelSVC), + WithLogger(zaptest.NewLogger(t)), + )...) + + orgID := influxdb.ID(9000) + + _, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) + require.Error(t, err) + + assert.GreaterOrEqual(t, fakeLabelSVC.DeleteLabelMappingCalls.Count(), killCount) + }) + } + testLabelMappingFn := func(t *testing.T, filename string, numExpected int, settersFn func() []ServiceSetterFn) { t.Run("applies successfully", func(t *testing.T) { testfileRunner(t, filename, func(t *testing.T, pkg *Pkg) { + t.Helper() + fakeLabelSVC := mock.NewLabelService() fakeLabelSVC.CreateLabelFn = func(_ context.Context, l *influxdb.Label) error { l.ID = influxdb.ID(rand.Int()) @@ -870,7 +961,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - _, err := svc.Apply(context.TODO(), orgID, 0, pkg) + _, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.NoError(t, err) assert.Equal(t, numExpected, fakeLabelSVC.CreateLabelMappingCalls.Count()) @@ -879,6 +970,8 @@ func TestService(t *testing.T) { t.Run("deletes new label mappings on error", func(t *testing.T) { testfileRunner(t, filename, func(t *testing.T, pkg *Pkg) { + t.Helper() + for _, l := range pkg.mLabels { for resource, vals := range l.mappings { // create extra label mappings, enough for delete to ahve head room @@ -893,7 +986,12 @@ func TestService(t *testing.T) { l.ID = influxdb.ID(fakeLabelSVC.CreateLabelCalls.Count() + 1) return nil } + fakeLabelSVC.DeleteLabelMappingFn = func(_ context.Context, m *influxdb.LabelMapping) error { + t.Logf("delete: %+v", m) + return nil + } fakeLabelSVC.CreateLabelMappingFn = func(_ context.Context, mapping *influxdb.LabelMapping) error { + t.Logf("create: %+v", mapping) if mapping.ResourceID == 0 { return errors.New("did not get a resource ID") } @@ -912,7 +1010,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - _, err := svc.Apply(context.TODO(), orgID, 0, pkg) + _, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.Error(t, err) assert.GreaterOrEqual(t, fakeLabelSVC.DeleteLabelMappingCalls.Count(), numExpected) @@ -921,23 +1019,26 @@ func TestService(t *testing.T) { } t.Run("maps buckets with labels", func(t *testing.T) { - testLabelMappingFn( - t, - "testdata/bucket_associates_label.yml", - 4, - func() []ServiceSetterFn { - fakeBktSVC := mock.NewBucketService() - fakeBktSVC.CreateBucketFn = func(_ context.Context, b *influxdb.Bucket) error { - b.ID = influxdb.ID(rand.Int()) - return nil - } - fakeBktSVC.FindBucketByNameFn = func(_ context.Context, id influxdb.ID, s string) (*influxdb.Bucket, error) { - // forces the bucket to be created a new - return nil, errors.New("an error") - } - return []ServiceSetterFn{WithBucketSVC(fakeBktSVC)} - }, - ) + bktOpt := func() []ServiceSetterFn { + fakeBktSVC := mock.NewBucketService() + fakeBktSVC.CreateBucketFn = func(_ context.Context, b *influxdb.Bucket) error { + b.ID = influxdb.ID(rand.Int()) + return nil + } + fakeBktSVC.FindBucketByNameFn = func(_ context.Context, id influxdb.ID, s string) (*influxdb.Bucket, error) { + // forces the bucket to be created a new + return nil, errors.New("an error") + } + return []ServiceSetterFn{WithBucketSVC(fakeBktSVC)} + } + + t.Run("applies successfully", func(t *testing.T) { + testLabelMappingV2ApplyFn(t, "testdata/bucket_associates_label.yml", 4, bktOpt) + }) + + t.Run("deletes new label mappings on error", func(t *testing.T) { + testLabelMappingV2RollbackFn(t, "testdata/bucket_associates_label.yml", 2, bktOpt) + }) }) t.Run("maps checks with labels", func(t *testing.T) { @@ -1100,7 +1201,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - sum, err := svc.Apply(context.TODO(), orgID, 0, pkg) + sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.NoError(t, err) require.Len(t, sum.NotificationEndpoints, 5) @@ -1153,7 +1254,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - _, err := svc.Apply(context.TODO(), orgID, 0, pkg) + _, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.Error(t, err) assert.GreaterOrEqual(t, fakeEndpointSVC.DeleteNotificationEndpointCalls.Count(), 5) @@ -1189,7 +1290,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - sum, err := svc.Apply(context.TODO(), orgID, 0, pkg) + sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.NoError(t, err) require.Len(t, sum.NotificationRules, 1) @@ -1240,7 +1341,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - _, err := svc.Apply(context.TODO(), orgID, 0, pkg) + _, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.Error(t, err) assert.Equal(t, 1, fakeRuleStore.DeleteNotificationRuleCalls.Count()) @@ -1274,7 +1375,7 @@ func TestService(t *testing.T) { svc := newTestService(WithTaskSVC(fakeTaskSVC)) - sum, err := svc.Apply(context.TODO(), orgID, 0, pkg) + sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.NoError(t, err) require.Len(t, sum.Tasks, 2) @@ -1302,7 +1403,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - _, err := svc.Apply(context.TODO(), orgID, 0, pkg) + _, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.Error(t, err) assert.Equal(t, 1, fakeTaskSVC.DeleteTaskCalls.Count()) @@ -1323,7 +1424,7 @@ func TestService(t *testing.T) { svc := newTestService(WithTelegrafSVC(fakeTeleSVC)) - sum, err := svc.Apply(context.TODO(), orgID, 0, pkg) + sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.NoError(t, err) require.Len(t, sum.TelegrafConfigs, 1) @@ -1355,7 +1456,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - _, err := svc.Apply(context.TODO(), orgID, 0, pkg) + _, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.Error(t, err) assert.Equal(t, 1, fakeTeleSVC.DeleteTelegrafConfigCalls.Count()) @@ -1376,7 +1477,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - sum, err := svc.Apply(context.TODO(), orgID, 0, pkg) + sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.NoError(t, err) require.Len(t, sum.Variables, 4) @@ -1409,7 +1510,7 @@ func TestService(t *testing.T) { orgID := influxdb.ID(9000) - _, err := svc.Apply(context.TODO(), orgID, 0, pkg) + _, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.Error(t, err) assert.GreaterOrEqual(t, fakeVarSVC.DeleteVariableCalls.Count(), 1) @@ -1420,20 +1521,21 @@ func TestService(t *testing.T) { testfileRunner(t, "testdata/variables.yml", func(t *testing.T, pkg *Pkg) { orgID := influxdb.ID(9000) - pkg.isVerified = true - pkgLabel := pkg.mVariables["var_const_3"] - pkgLabel.existing = &influxdb.Variable{ - // makes all pkg changes same as they are on the existing - ID: influxdb.ID(1), - OrganizationID: orgID, - Name: pkgLabel.Name(), - Arguments: &influxdb.VariableArguments{ - Type: "constant", - Values: influxdb.VariableConstantValues{"first val"}, - }, - } - fakeVarSVC := mock.NewVariableService() + fakeVarSVC.FindVariablesF = func(ctx context.Context, f influxdb.VariableFilter, _ ...influxdb.FindOptions) ([]*influxdb.Variable, error) { + return []*influxdb.Variable{ + { + // makes all pkg changes same as they are on the existing + ID: influxdb.ID(1), + OrganizationID: orgID, + Name: pkg.mVariables["var_const_3"].Name(), + Arguments: &influxdb.VariableArguments{ + Type: "constant", + Values: influxdb.VariableConstantValues{"first val"}, + }, + }, + }, nil + } fakeVarSVC.CreateVariableF = func(_ context.Context, l *influxdb.Variable) error { if l.Name == "var_const" { return errors.New("shouldn't get here") @@ -1449,7 +1551,7 @@ func TestService(t *testing.T) { svc := newTestService(WithVariableSVC(fakeVarSVC)) - sum, err := svc.Apply(context.TODO(), orgID, 0, pkg) + sum, _, err := svc.Apply(context.TODO(), orgID, 0, pkg) require.NoError(t, err) require.Len(t, sum.Variables, 4) diff --git a/pkger/service_tracing.go b/pkger/service_tracing.go index 220a655fc2..ad7eaf0223 100644 --- a/pkger/service_tracing.go +++ b/pkger/service_tracing.go @@ -39,7 +39,7 @@ func (s *traceMW) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pk return s.next.DryRun(ctx, orgID, userID, pkg, opts...) } -func (s *traceMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (sum Summary, err error) { +func (s *traceMW) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg, opts ...ApplyOptFn) (sum Summary, diff Diff, err error) { span, ctx := tracing.StartSpanFromContextWithOperationName(ctx, "Apply") span.LogKV("orgID", orgID.String(), "userID", userID.String()) defer span.Finish() diff --git a/tenant/service_org.go b/tenant/service_org.go index 4ef2a92c18..5619e506cb 100644 --- a/tenant/service_org.go +++ b/tenant/service_org.go @@ -153,7 +153,7 @@ func (s *Service) UpdateOrganization(ctx context.Context, id influxdb.ID, upd in return org, nil } -// Removes a organization by ID. +// DeleteOrganization removes a organization by ID and its dependent resources. func (s *Service) DeleteOrganization(ctx context.Context, id influxdb.ID) error { err := s.store.Update(ctx, func(tx kv.Tx) error { // clean up the buckets for this organization