diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index c07bf70fa4..d8aab08e32 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -840,6 +840,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { pkger.WithBucketSVC(authorizer.NewBucketService(b.BucketService)), pkger.WithDashboardSVC(authorizer.NewDashboardService(b.DashboardService)), pkger.WithLabelSVC(authorizer.NewLabelService(b.LabelService)), + pkger.WithNoticationEndpointSVC(authorizer.NewNotificationEndpointService(b.NotificationEndpointService, b.UserResourceMappingService, b.OrganizationService)), pkger.WithTelegrafSVC(authorizer.NewTelegrafConfigService(b.TelegrafService, b.UserResourceMappingService)), pkger.WithVariableSVC(authorizer.NewVariableService(b.VariableService)), ) diff --git a/cmd/influxd/launcher/launcher_helpers.go b/cmd/influxd/launcher/launcher_helpers.go index 7f5b81bb75..6a37c4ad3d 100644 --- a/cmd/influxd/launcher/launcher_helpers.go +++ b/cmd/influxd/launcher/launcher_helpers.go @@ -344,6 +344,11 @@ func (tl *TestLauncher) LabelService(tb testing.TB) *http.LabelService { return &http.LabelService{Client: tl.HTTPClient(tb)} } +func (tl *TestLauncher) NotificationEndpointService(tb testing.TB) *http.NotificationEndpointService { + tb.Helper() + return http.NewNotificationEndpointService(tl.HTTPClient(tb)) +} + func (tl *TestLauncher) TelegrafService(tb testing.TB) *http.TelegrafService { tb.Helper() return http.NewTelegrafService(tl.HTTPClient(tb)) diff --git a/cmd/influxd/launcher/pkger_test.go b/cmd/influxd/launcher/pkger_test.go index 23c71bce67..f4198b4b5c 100644 --- a/cmd/influxd/launcher/pkger_test.go +++ b/cmd/influxd/launcher/pkger_test.go @@ -3,7 +3,6 @@ package launcher_test import ( "context" "errors" - "sync" "testing" "time" @@ -24,6 +23,7 @@ func TestLauncher_Pkger(t *testing.T) { pkger.WithBucketSVC(l.BucketService(t)), pkger.WithDashboardSVC(l.DashboardService(t)), pkger.WithLabelSVC(l.LabelService(t)), + pkger.WithNoticationEndpointSVC(l.NotificationEndpointService(t)), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -51,6 +51,7 @@ func TestLauncher_Pkger(t *testing.T) { LabelService: l.LabelService(t), killCount: 2, // hits error on 3rd attempt at creating a mapping }), + pkger.WithNoticationEndpointSVC(l.NotificationEndpointService(t)), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -78,6 +79,18 @@ func TestLauncher_Pkger(t *testing.T) { require.NoError(t, err) assert.Empty(t, dashs) + endpoints, _, err := l.NotificationEndpointService(t).FindNotificationEndpoints(ctx, influxdb.NotificationEndpointFilter{ + OrgID: &l.Org.ID, + }) + require.NoError(t, err) + assert.Empty(t, endpoints) + + teles, _, err := l.TelegrafService(t).FindTelegrafConfigs(ctx, influxdb.TelegrafConfigFilter{ + OrgID: &l.Org.ID, + }) + require.NoError(t, err) + assert.Empty(t, teles) + vars, err := l.VariableService(t).FindVariables(ctx, influxdb.VariableFilter{OrganizationID: &l.Org.ID}) require.NoError(t, err) assert.Empty(t, vars) @@ -131,6 +144,8 @@ func TestLauncher_Pkger(t *testing.T) { assert.True(t, diffVars[0].IsNew()) require.Len(t, diff.Dashboards, 1) + require.Len(t, diff.NotificationEndpoints, 1) + require.Len(t, diff.Telegrafs, 1) labels := sum.Labels require.Len(t, labels, 1) @@ -147,6 +162,18 @@ func TestLauncher_Pkger(t *testing.T) { assert.Equal(t, "desc1", dashs[0].Description) hasLabelAssociations(t, dashs[0].LabelAssociations, 1, "label_1") + endpoints := sum.NotificationEndpoints + require.Len(t, endpoints, 1) + assert.Equal(t, "http_none_auth_notification_endpoint", endpoints[0].GetName()) + assert.Equal(t, "http none auth desc", endpoints[0].GetDescription()) + hasLabelAssociations(t, endpoints[0].LabelAssociations, 1, "label_1") + + teles := sum.TelegrafConfigs + require.Len(t, teles, 1) + assert.Equal(t, "first_tele_config", teles[0].Name) + assert.Equal(t, "desc", teles[0].Description) + hasLabelAssociations(t, teles[0].LabelAssociations, 1, "label_1") + vars := sum.Variables require.Len(t, vars, 1) assert.Equal(t, "var_query_1", vars[0].Name) @@ -167,25 +194,25 @@ func TestLauncher_Pkger(t *testing.T) { labels := sum1.Labels require.Len(t, labels, 1) - assert.NotEqual(t, influxdb.ID(0), labels[0].ID) + assert.NotZero(t, labels[0].ID) assert.Equal(t, "label_1", labels[0].Name) bkts := sum1.Buckets require.Len(t, bkts, 1) - assert.NotEqual(t, influxdb.ID(0), bkts[0].ID) + assert.NotZero(t, bkts[0].ID) assert.Equal(t, "rucket_1", bkts[0].Name) hasLabelAssociations(t, bkts[0].LabelAssociations, 1, "label_1") dashs := sum1.Dashboards require.Len(t, dashs, 1) - assert.NotEqual(t, influxdb.ID(0), dashs[0].ID) + assert.NotZero(t, dashs[0].ID) assert.Equal(t, "dash_1", dashs[0].Name) assert.Equal(t, "desc1", dashs[0].Description) hasLabelAssociations(t, dashs[0].LabelAssociations, 1, "label_1") vars := sum1.Variables require.Len(t, vars, 1) - assert.NotEqual(t, influxdb.ID(0), vars[0].ID) + assert.NotZero(t, vars[0].ID) assert.Equal(t, "var_query_1", vars[0].Name) hasLabelAssociations(t, vars[0].LabelAssociations, 1, "label_1") varArgs := vars[0].Arguments @@ -328,6 +355,7 @@ func TestLauncher_Pkger(t *testing.T) { }), pkger.WithDashboardSVC(l.DashboardService(t)), pkger.WithLabelSVC(l.LabelService(t)), + pkger.WithNoticationEndpointSVC(l.NotificationEndpointService(t)), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -430,6 +458,15 @@ spec: bucket = "rucket_3" [[inputs.cpu]] percpu = true + - kind: NotificationEndpointHTTP + name: http_none_auth_notification_endpoint + type: none + description: http none auth desc + url: https://www.example.com/endpoint/noneauth + status: inactive + associations: + - kind: Label + name: label_1 ` const updatePkgYMLStr = `apiVersion: 0.1.0 @@ -477,18 +514,14 @@ func (f *fakeBucketSVC) UpdateBucket(ctx context.Context, id influxdb.ID, upd in type fakeLabelSVC struct { influxdb.LabelService - countMu sync.Mutex - callCount int + callCount mock.SafeCount killCount int } func (f *fakeLabelSVC) CreateLabelMapping(ctx context.Context, m *influxdb.LabelMapping) error { - f.countMu.Lock() - if f.callCount == f.killCount { - f.countMu.Unlock() + defer f.callCount.IncrFn()() + if f.callCount.Count() == f.killCount { return errors.New("reached kill count") } - f.callCount++ - f.countMu.Unlock() return f.LabelService.CreateLabelMapping(ctx, m) } diff --git a/http/notification_endpoint.go b/http/notification_endpoint.go index 77fbf3b84a..bac085c40a 100644 --- a/http/notification_endpoint.go +++ b/http/notification_endpoint.go @@ -609,7 +609,7 @@ var _ influxdb.NotificationEndpointService = (*NotificationEndpointService)(nil) // FindNotificationEndpointByID returns a single notification endpoint by ID. func (s *NotificationEndpointService) FindNotificationEndpointByID(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) { - var resp notificationEndpointResponse + var resp notificationEndpointDecoder err := s.Client. Get(prefixNotificationEndpoints, id.String()). DecodeJSON(&resp). @@ -617,7 +617,7 @@ func (s *NotificationEndpointService) FindNotificationEndpointByID(ctx context.C if err != nil { return nil, err } - return resp.NotificationEndpoint, nil + return resp.endpoint, nil } // FindNotificationEndpoints returns a list of notification endpoints that match filter and the total count of matching notification endpoints. @@ -634,7 +634,9 @@ func (s *NotificationEndpointService) FindNotificationEndpoints(ctx context.Cont params = append(params, [2]string{"org", *filter.Org}) } - var resp notificationEndpointsResponse + var resp struct { + Endpoints []notificationEndpointDecoder `json:"notificationEndpoints"` + } err := s.Client. Get(prefixNotificationEndpoints). QueryParams(params...). @@ -645,11 +647,10 @@ func (s *NotificationEndpointService) FindNotificationEndpoints(ctx context.Cont } var endpoints []influxdb.NotificationEndpoint - for _, e := range resp.NotificationEndpoints { - endpoints = append(endpoints, e.NotificationEndpoint) + for _, e := range resp.Endpoints { + endpoints = append(endpoints, e.endpoint) } - - return endpoints, len(resp.NotificationEndpoints), nil + return endpoints, len(endpoints), nil } // CreateNotificationEndpoint creates a new notification endpoint and sets b.ID with the new identifier. @@ -658,7 +659,7 @@ func (s *NotificationEndpointService) FindNotificationEndpoints(ctx context.Cont func (s *NotificationEndpointService) CreateNotificationEndpoint(ctx context.Context, ne influxdb.NotificationEndpoint, userID influxdb.ID) error { // userID is ignored here since server reads it off // the token/auth. its a nothing burger here - var resp notificationEndpointResponse + var resp notificationEndpointDecoder err := s.Client. Post(httpc.BodyJSON(ne), prefixNotificationEndpoints). DecodeJSON(&resp). @@ -667,15 +668,15 @@ func (s *NotificationEndpointService) CreateNotificationEndpoint(ctx context.Con return err } // :sadpanda: - ne.SetID(resp.GetID()) - ne.SetOrgID(resp.GetOrgID()) + ne.SetID(resp.endpoint.GetID()) + ne.SetOrgID(resp.endpoint.GetOrgID()) return nil } // UpdateNotificationEndpoint updates a single notification endpoint. // Returns the new notification endpoint after update. func (s *NotificationEndpointService) UpdateNotificationEndpoint(ctx context.Context, id influxdb.ID, nr influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) { - var resp notificationEndpointResponse + var resp notificationEndpointDecoder err := s.Client. Put(httpc.BodyJSON(nr), prefixNotificationEndpoints, id.String()). DecodeJSON(&resp). @@ -683,7 +684,7 @@ func (s *NotificationEndpointService) UpdateNotificationEndpoint(ctx context.Con if err != nil { return nil, err } - return resp.NotificationEndpoint, nil + return resp.endpoint, nil } // PatchNotificationEndpoint updates a single notification endpoint with changeset. @@ -693,7 +694,7 @@ func (s *NotificationEndpointService) PatchNotificationEndpoint(ctx context.Cont return nil, err } - var resp notificationEndpointResponse + var resp notificationEndpointDecoder err := s.Client. Patch(httpc.BodyJSON(upd), prefixNotificationEndpoints, id.String()). DecodeJSON(&resp). @@ -701,10 +702,23 @@ func (s *NotificationEndpointService) PatchNotificationEndpoint(ctx context.Cont if err != nil { return nil, err } - return resp.NotificationEndpoint, nil + return resp.endpoint, nil } // DeleteNotificationEndpoint removes a notification endpoint by ID, returns secret fields, orgID for further deletion. func (s *NotificationEndpointService) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) (flds []influxdb.SecretField, orgID influxdb.ID, err error) { panic("not implemented") } + +type notificationEndpointDecoder struct { + endpoint influxdb.NotificationEndpoint +} + +func (n *notificationEndpointDecoder) UnmarshalJSON(b []byte) error { + newEndpoint, err := endpoint.UnmarshalJSON(b) + if err != nil { + return err + } + n.endpoint = newEndpoint + return nil +} diff --git a/pkger/models.go b/pkger/models.go index a2474aedf5..10440fca68 100644 --- a/pkger/models.go +++ b/pkger/models.go @@ -19,6 +19,7 @@ const ( KindBucket Kind = "bucket" KindDashboard Kind = "dashboard" KindLabel Kind = "label" + KindNotificationEndpoint Kind = "notificationendpoint" KindNotificationEndpointPagerDuty Kind = "notificationendpointpagerduty" KindNotificationEndpointHTTP Kind = "notificationendpointhttp" KindNotificationEndpointSlack Kind = "notificationendpointslack" @@ -79,7 +80,8 @@ func (k Kind) ResourceType() influxdb.ResourceType { return influxdb.DashboardsResourceType case KindLabel: return influxdb.LabelsResourceType - case KindNotificationEndpointHTTP, + case KindNotificationEndpoint, + KindNotificationEndpointHTTP, KindNotificationEndpointPagerDuty, KindNotificationEndpointSlack: return influxdb.NotificationEndpointResourceType @@ -133,12 +135,13 @@ type Metadata struct { // Diff is the result of a service DryRun call. The diff outlines // what is new and or updated from the current state of the platform. type Diff struct { - Buckets []DiffBucket `json:"buckets"` - Dashboards []DiffDashboard `json:"dashboards"` - Labels []DiffLabel `json:"labels"` - LabelMappings []DiffLabelMapping `json:"labelMappings"` - Telegrafs []DiffTelegraf `json:"telegrafConfigs"` - Variables []DiffVariable `json:"variables"` + Buckets []DiffBucket `json:"buckets"` + Dashboards []DiffDashboard `json:"dashboards"` + Labels []DiffLabel `json:"labels"` + LabelMappings []DiffLabelMapping `json:"labelMappings"` + NotificationEndpoints []DiffNotificationEndpoint `json:"notificationEndpoints"` + Telegrafs []DiffTelegraf `json:"telegrafConfigs"` + Variables []DiffVariable `json:"variables"` } // HasConflicts provides a binary t/f if there are any changes within package @@ -291,6 +294,35 @@ type DiffLabelMapping struct { LabelName string `json:"labelName"` } +// DiffNotificationEndpointValues are the varying values for a notification endpoint. +type DiffNotificationEndpointValues struct { + influxdb.NotificationEndpoint +} + +// DiffNotificationEndpoint is a diff of an individual notification endpoint. +type DiffNotificationEndpoint struct { + ID SafeID `json:"id"` + Name string `json:"name"` + New DiffNotificationEndpointValues `json:"new"` + Old *DiffNotificationEndpointValues `json:"old,omitempty"` // using omitempty here to signal there was no prev state with a nil +} + +func newDiffNotificationEndpoint(ne *notificationEndpoint, i influxdb.NotificationEndpoint) DiffNotificationEndpoint { + diff := DiffNotificationEndpoint{ + Name: ne.Name(), + New: DiffNotificationEndpointValues{ + NotificationEndpoint: ne.summarize().NotificationEndpoint, + }, + } + if i != nil { + diff.ID = SafeID(i.GetID()) + diff.Old = &DiffNotificationEndpointValues{ + NotificationEndpoint: i, + } + } + return diff +} + // DiffTelegraf is a diff of an individual telegraf. type DiffTelegraf struct { influxdb.TelegrafConfig @@ -488,6 +520,10 @@ func (b *bucket) ID() influxdb.ID { return b.id } +func (b *bucket) Labels() []*label { + return b.labels +} + func (b *bucket) Name() string { return b.name } @@ -524,6 +560,16 @@ func (b *bucket) shouldApply() bool { b.RetentionRules.RP() != b.existing.RetentionPeriod } +type mapperBuckets []*bucket + +func (b mapperBuckets) Association(i int) labelAssociater { + return b[i] +} + +func (b mapperBuckets) Len() int { + return len(b) +} + const ( retentionRuleTypeExpire = "expire" ) @@ -765,6 +811,8 @@ const ( type notificationEndpoint struct { kind notificationKind + id influxdb.ID + OrgID influxdb.ID name string description string password string @@ -776,6 +824,23 @@ type notificationEndpoint struct { username string labels sortedLabels + + existing influxdb.NotificationEndpoint +} + +func (n *notificationEndpoint) Exists() bool { + return n.existing != nil +} + +func (n *notificationEndpoint) ID() influxdb.ID { + if n.existing != nil { + return n.existing.GetID() + } + return n.id +} + +func (n *notificationEndpoint) Labels() []*label { + return n.labels } func (n *notificationEndpoint) Name() string { @@ -786,12 +851,18 @@ func (n *notificationEndpoint) ResourceType() influxdb.ResourceType { return KindNotificationEndpointSlack.ResourceType() } -func (n *notificationEndpoint) summarize() SummaryNotificationEndpoint { - base := endpoint.Base{ +func (n *notificationEndpoint) base() endpoint.Base { + return endpoint.Base{ + ID: n.ID(), + OrgID: n.OrgID, Name: n.Name(), Description: n.description, Status: influxdb.TaskStatusActive, } +} + +func (n *notificationEndpoint) summarize() SummaryNotificationEndpoint { + base := n.base() if n.status != "" { base.Status = influxdb.Status(n.status) } @@ -891,6 +962,16 @@ func (n *notificationEndpoint) valid() []validationErr { return failures } +type mapperNotificationEndpoints []*notificationEndpoint + +func (n mapperNotificationEndpoints) Association(i int) labelAssociater { + return n[i] +} + +func (n mapperNotificationEndpoints) Len() int { + return len(n) +} + const ( fieldTelegrafConfig = "config" ) @@ -905,6 +986,10 @@ func (t *telegraf) ID() influxdb.ID { return t.config.ID } +func (t *telegraf) Labels() []*label { + return t.labels +} + func (t *telegraf) Name() string { return t.config.Name } @@ -924,6 +1009,16 @@ func (t *telegraf) summarize() SummaryTelegraf { } } +type mapperTelegrafs []*telegraf + +func (m mapperTelegrafs) Association(i int) labelAssociater { + return m[i] +} + +func (m mapperTelegrafs) Len() int { + return len(m) +} + const ( fieldArgTypeConstant = "constant" fieldArgTypeMap = "map" @@ -957,6 +1052,10 @@ func (v *variable) Exists() bool { return v.existing != nil } +func (v *variable) Labels() []*label { + return v.labels +} + func (v *variable) Name() string { return v.name } @@ -969,7 +1068,7 @@ func (v *variable) shouldApply() bool { return v.existing == nil || v.existing.Description != v.Description || v.existing.Arguments == nil || - v.existing.Arguments.Type != v.Type + !reflect.DeepEqual(v.existing.Arguments, v.influxVarArgs()) } func (v *variable) summarize() SummaryVariable { @@ -1037,6 +1136,16 @@ func (v *variable) valid() []validationErr { return failures } +type mapperVariables []*variable + +func (m mapperVariables) Association(i int) labelAssociater { + return m[i] +} + +func (m mapperVariables) Len() int { + return len(m) +} + const ( fieldDashCharts = "charts" ) @@ -1055,6 +1164,10 @@ func (d *dashboard) ID() influxdb.ID { return d.id } +func (d *dashboard) Labels() []*label { + return d.labels +} + func (d *dashboard) Name() string { return d.name } @@ -1087,6 +1200,16 @@ func (d *dashboard) summarize() SummaryDashboard { return iDash } +type mapperDashboards []*dashboard + +func (m mapperDashboards) Association(i int) labelAssociater { + return m[i] +} + +func (m mapperDashboards) Len() int { + return len(m) +} + const ( fieldChartAxes = "axes" fieldChartBinCount = "binCount" diff --git a/pkger/service.go b/pkger/service.go index 3daaf4ba46..860fc9363a 100644 --- a/pkger/service.go +++ b/pkger/service.go @@ -26,12 +26,13 @@ type SVC interface { } type serviceOpt struct { - logger *zap.Logger - labelSVC influxdb.LabelService - bucketSVC influxdb.BucketService - dashSVC influxdb.DashboardService - teleSVC influxdb.TelegrafConfigStore - varSVC influxdb.VariableService + logger *zap.Logger + labelSVC influxdb.LabelService + bucketSVC influxdb.BucketService + dashSVC influxdb.DashboardService + endpointSVC influxdb.NotificationEndpointService + teleSVC influxdb.TelegrafConfigStore + varSVC influxdb.VariableService applyReqLimit int } @@ -60,6 +61,13 @@ func WithDashboardSVC(dashSVC influxdb.DashboardService) ServiceSetterFn { } } +// WithNoticationEndpointSVC sets the endpoint notification service. +func WithNoticationEndpointSVC(endpointSVC influxdb.NotificationEndpointService) ServiceSetterFn { + return func(opt *serviceOpt) { + opt.endpointSVC = endpointSVC + } +} + // WithLabelSVC sets the label service. func WithLabelSVC(labelSVC influxdb.LabelService) ServiceSetterFn { return func(opt *serviceOpt) { @@ -95,11 +103,12 @@ func WithApplyReqLimit(limit int) ServiceSetterFn { type Service struct { log *zap.Logger - labelSVC influxdb.LabelService - bucketSVC influxdb.BucketService - dashSVC influxdb.DashboardService - teleSVC influxdb.TelegrafConfigStore - varSVC influxdb.VariableService + labelSVC influxdb.LabelService + bucketSVC influxdb.BucketService + dashSVC influxdb.DashboardService + endpointSVC influxdb.NotificationEndpointService + teleSVC influxdb.TelegrafConfigStore + varSVC influxdb.VariableService applyReqLimit int } @@ -119,6 +128,7 @@ func NewService(opts ...ServiceSetterFn) *Service { bucketSVC: opt.bucketSVC, labelSVC: opt.labelSVC, dashSVC: opt.dashSVC, + endpointSVC: opt.endpointSVC, teleSVC: opt.teleSVC, varSVC: opt.varSVC, applyReqLimit: opt.applyReqLimit, @@ -501,6 +511,11 @@ func (s *Service) DryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (Summ return Summary{}, Diff{}, err } + diffEndpoints, err := s.dryRunNotificationEndpoints(ctx, orgID, pkg) + if err != nil { + return Summary{}, Diff{}, err + } + diffVars, err := s.dryRunVariables(ctx, orgID, pkg) if err != nil { return Summary{}, Diff{}, err @@ -517,12 +532,13 @@ func (s *Service) DryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (Summ pkg.isVerified = true diff := Diff{ - Buckets: diffBuckets, - Dashboards: s.dryRunDashboards(ctx, orgID, pkg), - Labels: diffLabels, - LabelMappings: diffLabelMappings, - Telegrafs: s.dryRunTelegraf(ctx, orgID, pkg), - Variables: diffVars, + Buckets: diffBuckets, + Dashboards: s.dryRunDashboards(ctx, orgID, pkg), + Labels: diffLabels, + LabelMappings: diffLabelMappings, + NotificationEndpoints: diffEndpoints, + Telegrafs: s.dryRunTelegraf(ctx, orgID, pkg), + Variables: diffVars, } return pkg.Summary(), diff, parseErr } @@ -555,7 +571,7 @@ func (s *Service) dryRunBuckets(ctx context.Context, orgID influxdb.ID, pkg *Pkg return diffs, nil } -func (s *Service) dryRunDashboards(ctx context.Context, orgID influxdb.ID, pkg *Pkg) []DiffDashboard { +func (s *Service) dryRunDashboards(_ context.Context, _ influxdb.ID, pkg *Pkg) []DiffDashboard { var diffs []DiffDashboard for _, d := range pkg.dashboards() { diffs = append(diffs, newDiffDashboard(d)) @@ -595,7 +611,45 @@ func (s *Service) dryRunLabels(ctx context.Context, orgID influxdb.ID, pkg *Pkg) return diffs, nil } -func (s *Service) dryRunTelegraf(ctx context.Context, orgID influxdb.ID, pkg *Pkg) []DiffTelegraf { +func (s *Service) dryRunNotificationEndpoints(ctx context.Context, orgID influxdb.ID, pkg *Pkg) ([]DiffNotificationEndpoint, error) { + existingEndpoints, _, err := s.endpointSVC.FindNotificationEndpoints(ctx, influxdb.NotificationEndpointFilter{ + OrgID: &orgID, + }) // grab em all + if err != nil { + return nil, err + } + + mExisting := make(map[string]influxdb.NotificationEndpoint) + for i := range existingEndpoints { + e := existingEndpoints[i] + mExisting[e.GetName()] = e + } + + mExistingToNew := make(map[string]DiffNotificationEndpoint) + endpoints := pkg.notificationEndpoints() + for i := range endpoints { + newEndpoint := endpoints[i] + + var existing influxdb.NotificationEndpoint + if iExisting, ok := mExisting[newEndpoint.Name()]; ok { + newEndpoint.existing = iExisting + existing = iExisting + } + mExistingToNew[newEndpoint.Name()] = newDiffNotificationEndpoint(newEndpoint, existing) + } + + var diffs []DiffNotificationEndpoint + for _, diff := range mExistingToNew { + diffs = append(diffs, diff) + } + sort.Slice(diffs, func(i, j int) bool { + return diffs[i].Name < diffs[j].Name + }) + + return diffs, nil +} + +func (s *Service) dryRunTelegraf(_ context.Context, _ influxdb.ID, pkg *Pkg) []DiffTelegraf { var diffs []DiffTelegraf for _, t := range pkg.telegrafs() { diffs = append(diffs, newDiffTelegraf(t)) @@ -649,82 +703,47 @@ VarLoop: type ( labelMappingDiffFn func(labelID influxdb.ID, labelName string, isNew bool) + labelMappers interface { + Association(i int) labelAssociater + Len() int + } + labelAssociater interface { ID() influxdb.ID + Name() string + Labels() []*label ResourceType() influxdb.ResourceType Exists() bool } ) func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg) ([]DiffLabelMapping, error) { + mappers := []labelMappers{ + mapperBuckets(pkg.buckets()), + mapperDashboards(pkg.mDashboards), + mapperNotificationEndpoints(pkg.notificationEndpoints()), + mapperTelegrafs(pkg.mTelegrafs), + mapperVariables(pkg.variables()), + } + var diffs []DiffLabelMapping - for _, b := range pkg.buckets() { - err := s.dryRunResourceLabelMapping(ctx, b, b.labels, func(labelID influxdb.ID, labelName string, isNew bool) { - if l, ok := pkg.mLabels[labelName]; ok { - l.setMapping(b, !isNew) + for _, mapper := range mappers { + for i := 0; i < mapper.Len(); i++ { + la := mapper.Association(i) + err := s.dryRunResourceLabelMapping(ctx, la, func(labelID influxdb.ID, labelName string, isNew bool) { + pkg.mLabels[labelName].setMapping(la, !isNew) + diffs = append(diffs, DiffLabelMapping{ + IsNew: isNew, + ResType: la.ResourceType(), + ResID: SafeID(la.ID()), + ResName: la.Name(), + LabelID: SafeID(labelID), + LabelName: labelName, + }) + }) + if err != nil { + return nil, err } - diffs = append(diffs, DiffLabelMapping{ - IsNew: isNew, - ResType: b.ResourceType(), - ResID: SafeID(b.ID()), - ResName: b.Name(), - LabelID: SafeID(labelID), - LabelName: labelName, - }) - }) - if err != nil { - return nil, err - } - } - - for _, d := range pkg.dashboards() { - err := s.dryRunResourceLabelMapping(ctx, d, d.labels, func(labelID influxdb.ID, labelName string, isNew bool) { - pkg.mLabels[labelName].setMapping(d, false) - diffs = append(diffs, DiffLabelMapping{ - IsNew: isNew, - ResType: d.ResourceType(), - ResID: SafeID(d.ID()), - ResName: d.Name(), - LabelID: SafeID(labelID), - LabelName: labelName, - }) - }) - if err != nil { - return nil, err - } - } - - for _, t := range pkg.telegrafs() { - err := s.dryRunResourceLabelMapping(ctx, t, t.labels, func(labelID influxdb.ID, labelName string, isNew bool) { - pkg.mLabels[labelName].setMapping(t, false) - diffs = append(diffs, DiffLabelMapping{ - IsNew: isNew, - ResType: t.ResourceType(), - ResID: SafeID(t.ID()), - ResName: t.Name(), - LabelID: SafeID(labelID), - LabelName: labelName, - }) - }) - if err != nil { - return nil, err - } - } - - for _, v := range pkg.variables() { - err := s.dryRunResourceLabelMapping(ctx, v, v.labels, func(labelID influxdb.ID, labelName string, isNew bool) { - pkg.mLabels[labelName].setMapping(v, !isNew) - diffs = append(diffs, DiffLabelMapping{ - IsNew: isNew, - ResType: v.ResourceType(), - ResID: SafeID(v.ID()), - ResName: v.Name(), - LabelID: SafeID(labelID), - LabelName: labelName, - }) - }) - if err != nil { - return nil, err } } @@ -749,9 +768,9 @@ func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg) ([]DiffLabe return diffs, nil } -func (s *Service) dryRunResourceLabelMapping(ctx context.Context, la labelAssociater, labels []*label, mappingFn labelMappingDiffFn) error { +func (s *Service) dryRunResourceLabelMapping(ctx context.Context, la labelAssociater, mappingFn labelMappingDiffFn) error { if !la.Exists() { - for _, l := range labels { + for _, l := range la.Labels() { mappingFn(l.ID(), l.Name(), true) } return nil @@ -770,7 +789,7 @@ func (s *Service) dryRunResourceLabelMapping(ctx context.Context, la labelAssoci return err } - pkgLabels := labelSlcToMap(labels) + pkgLabels := labelSlcToMap(la.Labels()) for _, l := range existingLabels { // should ignore any labels that are not specified in pkg mappingFn(l.ID, l.Name, false) @@ -1275,7 +1294,7 @@ func (s *Service) applyLabelMappings(labelMappings []SummaryLabelMapping) applie err := s.labelSVC.CreateLabelMapping(ctx, &mapping.LabelMapping) if err != nil { return &applyErrBody{ - name: fmt.Sprintf("%s:%s", mapping.ResourceID, mapping.LabelID), + name: fmt.Sprintf("%s:%s:%s", mapping.ResourceType, mapping.ResourceID, mapping.LabelID), msg: err.Error(), } } diff --git a/pkger/service_test.go b/pkger/service_test.go index a47eb83bd5..31af53127c 100644 --- a/pkger/service_test.go +++ b/pkger/service_test.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/mock" + "github.com/influxdata/influxdb/notification/endpoint" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" @@ -18,11 +19,12 @@ import ( func TestService(t *testing.T) { newTestService := func(opts ...ServiceSetterFn) *Service { opt := serviceOpt{ - bucketSVC: mock.NewBucketService(), - dashSVC: mock.NewDashboardService(), - labelSVC: mock.NewLabelService(), - teleSVC: mock.NewTelegrafConfigStore(), - varSVC: mock.NewVariableService(), + bucketSVC: mock.NewBucketService(), + dashSVC: mock.NewDashboardService(), + labelSVC: mock.NewLabelService(), + endpointSVC: mock.NewNotificationEndpointService(), + teleSVC: mock.NewTelegrafConfigStore(), + varSVC: mock.NewVariableService(), } for _, o := range opts { o(&opt) @@ -32,6 +34,7 @@ func TestService(t *testing.T) { WithBucketSVC(opt.bucketSVC), WithDashboardSVC(opt.dashSVC), WithLabelSVC(opt.labelSVC), + WithNoticationEndpointSVC(opt.endpointSVC), WithTelegrafSVC(opt.teleSVC), WithVariableSVC(opt.varSVC), ) @@ -40,7 +43,7 @@ func TestService(t *testing.T) { t.Run("DryRun", func(t *testing.T) { t.Run("buckets", func(t *testing.T) { t.Run("single bucket updated", func(t *testing.T) { - testfileRunner(t, "testdata/bucket", func(t *testing.T, pkg *Pkg) { + testfileRunner(t, "testdata/bucket.yml", func(t *testing.T, pkg *Pkg) { fakeBktSVC := mock.NewBucketService() fakeBktSVC.FindBucketByNameFn = func(_ context.Context, orgID influxdb.ID, name string) (*influxdb.Bucket, error) { return &influxdb.Bucket{ @@ -51,7 +54,7 @@ func TestService(t *testing.T) { RetentionPeriod: 30 * time.Hour, }, nil } - svc := newTestService(WithBucketSVC(fakeBktSVC), WithLabelSVC(mock.NewLabelService())) + svc := newTestService(WithBucketSVC(fakeBktSVC)) _, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg) require.NoError(t, err) @@ -75,12 +78,12 @@ func TestService(t *testing.T) { }) t.Run("single bucket new", func(t *testing.T) { - testfileRunner(t, "testdata/bucket", func(t *testing.T, pkg *Pkg) { + testfileRunner(t, "testdata/bucket.json", func(t *testing.T, pkg *Pkg) { fakeBktSVC := mock.NewBucketService() fakeBktSVC.FindBucketByNameFn = func(_ context.Context, orgID influxdb.ID, name string) (*influxdb.Bucket, error) { return nil, errors.New("not found") } - svc := newTestService(WithBucketSVC(fakeBktSVC), WithLabelSVC(mock.NewLabelService())) + svc := newTestService(WithBucketSVC(fakeBktSVC)) _, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg) require.NoError(t, err) @@ -101,7 +104,7 @@ func TestService(t *testing.T) { t.Run("labels", func(t *testing.T) { t.Run("two labels updated", func(t *testing.T) { - testfileRunner(t, "testdata/label", func(t *testing.T, pkg *Pkg) { + testfileRunner(t, "testdata/label.json", func(t *testing.T, pkg *Pkg) { fakeLabelSVC := mock.NewLabelService() fakeLabelSVC.FindLabelsFn = func(_ context.Context, filter influxdb.LabelFilter) ([]*influxdb.Label, error) { return []*influxdb.Label{ @@ -144,7 +147,7 @@ func TestService(t *testing.T) { }) t.Run("two labels created", func(t *testing.T) { - testfileRunner(t, "testdata/label", func(t *testing.T, pkg *Pkg) { + testfileRunner(t, "testdata/label.yml", func(t *testing.T, pkg *Pkg) { fakeLabelSVC := mock.NewLabelService() fakeLabelSVC.FindLabelsFn = func(_ context.Context, filter influxdb.LabelFilter) ([]*influxdb.Label, error) { return nil, errors.New("no labels found") @@ -173,6 +176,69 @@ func TestService(t *testing.T) { }) }) + t.Run("notification endpoints", func(t *testing.T) { + testfileRunner(t, "testdata/notification_endpoint.yml", func(t *testing.T, pkg *Pkg) { + fakeEndpointSVC := mock.NewNotificationEndpointService() + existing := &endpoint.HTTP{ + Base: endpoint.Base{ + ID: 1, + Name: "http_none_auth_notification_endpoint", + Description: "old desc", + Status: influxdb.TaskStatusInactive, + }, + Method: "POST", + AuthMethod: "none", + URL: "https://www.example.com/endpoint/old", + } + fakeEndpointSVC.FindNotificationEndpointsF = func(ctx context.Context, f influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error) { + return []influxdb.NotificationEndpoint{existing}, 1, nil + } + + svc := newTestService(WithNoticationEndpointSVC(fakeEndpointSVC)) + + _, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg) + require.NoError(t, err) + + require.Len(t, diff.NotificationEndpoints, 5) + + var ( + newEndpoints []DiffNotificationEndpoint + existingEndpoints []DiffNotificationEndpoint + ) + for _, e := range diff.NotificationEndpoints { + if e.Old != nil { + existingEndpoints = append(existingEndpoints, e) + continue + } + newEndpoints = append(newEndpoints, e) + } + require.Len(t, newEndpoints, 4) + require.Len(t, existingEndpoints, 1) + + expected := DiffNotificationEndpoint{ + ID: SafeID(1), + Name: "http_none_auth_notification_endpoint", + Old: &DiffNotificationEndpointValues{ + NotificationEndpoint: existing, + }, + New: DiffNotificationEndpointValues{ + NotificationEndpoint: &endpoint.HTTP{ + Base: endpoint.Base{ + ID: 1, + Name: "http_none_auth_notification_endpoint", + Description: "http none auth desc", + Status: influxdb.TaskStatusActive, + }, + AuthMethod: "none", + Method: "POST", + URL: "https://www.example.com/endpoint/noneauth", + }, + }, + } + assert.Equal(t, expected, existingEndpoints[0]) + }) + }) + t.Run("variables", func(t *testing.T) { testfileRunner(t, "testdata/variables", func(t *testing.T, pkg *Pkg) { fakeVarSVC := mock.NewVariableService() @@ -185,8 +251,7 @@ func TestService(t *testing.T) { }, }, nil } - fakeLabelSVC := mock.NewLabelService() // ignore mappings for now - svc := newTestService(WithLabelSVC(fakeLabelSVC), WithVariableSVC(fakeVarSVC)) + svc := newTestService(WithVariableSVC(fakeVarSVC)) _, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg) require.NoError(t, err)