feat(pkger): add notification endpoint dry run functionality

pull/16209/head
Johnny Steenbergen 2019-12-10 13:35:23 -08:00 committed by Johnny Steenbergen
parent b5ccad3c07
commit 69d7eb4455
7 changed files with 398 additions and 138 deletions

View File

@ -840,6 +840,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
pkger.WithBucketSVC(authorizer.NewBucketService(b.BucketService)),
pkger.WithDashboardSVC(authorizer.NewDashboardService(b.DashboardService)),
pkger.WithLabelSVC(authorizer.NewLabelService(b.LabelService)),
pkger.WithNoticationEndpointSVC(authorizer.NewNotificationEndpointService(b.NotificationEndpointService, b.UserResourceMappingService, b.OrganizationService)),
pkger.WithTelegrafSVC(authorizer.NewTelegrafConfigService(b.TelegrafService, b.UserResourceMappingService)),
pkger.WithVariableSVC(authorizer.NewVariableService(b.VariableService)),
)

View File

@ -344,6 +344,11 @@ func (tl *TestLauncher) LabelService(tb testing.TB) *http.LabelService {
return &http.LabelService{Client: tl.HTTPClient(tb)}
}
func (tl *TestLauncher) NotificationEndpointService(tb testing.TB) *http.NotificationEndpointService {
tb.Helper()
return http.NewNotificationEndpointService(tl.HTTPClient(tb))
}
func (tl *TestLauncher) TelegrafService(tb testing.TB) *http.TelegrafService {
tb.Helper()
return http.NewTelegrafService(tl.HTTPClient(tb))

View File

@ -3,7 +3,6 @@ package launcher_test
import (
"context"
"errors"
"sync"
"testing"
"time"
@ -24,6 +23,7 @@ func TestLauncher_Pkger(t *testing.T) {
pkger.WithBucketSVC(l.BucketService(t)),
pkger.WithDashboardSVC(l.DashboardService(t)),
pkger.WithLabelSVC(l.LabelService(t)),
pkger.WithNoticationEndpointSVC(l.NotificationEndpointService(t)),
pkger.WithTelegrafSVC(l.TelegrafService(t)),
pkger.WithVariableSVC(l.VariableService(t)),
)
@ -51,6 +51,7 @@ func TestLauncher_Pkger(t *testing.T) {
LabelService: l.LabelService(t),
killCount: 2, // hits error on 3rd attempt at creating a mapping
}),
pkger.WithNoticationEndpointSVC(l.NotificationEndpointService(t)),
pkger.WithTelegrafSVC(l.TelegrafService(t)),
pkger.WithVariableSVC(l.VariableService(t)),
)
@ -78,6 +79,18 @@ func TestLauncher_Pkger(t *testing.T) {
require.NoError(t, err)
assert.Empty(t, dashs)
endpoints, _, err := l.NotificationEndpointService(t).FindNotificationEndpoints(ctx, influxdb.NotificationEndpointFilter{
OrgID: &l.Org.ID,
})
require.NoError(t, err)
assert.Empty(t, endpoints)
teles, _, err := l.TelegrafService(t).FindTelegrafConfigs(ctx, influxdb.TelegrafConfigFilter{
OrgID: &l.Org.ID,
})
require.NoError(t, err)
assert.Empty(t, teles)
vars, err := l.VariableService(t).FindVariables(ctx, influxdb.VariableFilter{OrganizationID: &l.Org.ID})
require.NoError(t, err)
assert.Empty(t, vars)
@ -131,6 +144,8 @@ func TestLauncher_Pkger(t *testing.T) {
assert.True(t, diffVars[0].IsNew())
require.Len(t, diff.Dashboards, 1)
require.Len(t, diff.NotificationEndpoints, 1)
require.Len(t, diff.Telegrafs, 1)
labels := sum.Labels
require.Len(t, labels, 1)
@ -147,6 +162,18 @@ func TestLauncher_Pkger(t *testing.T) {
assert.Equal(t, "desc1", dashs[0].Description)
hasLabelAssociations(t, dashs[0].LabelAssociations, 1, "label_1")
endpoints := sum.NotificationEndpoints
require.Len(t, endpoints, 1)
assert.Equal(t, "http_none_auth_notification_endpoint", endpoints[0].GetName())
assert.Equal(t, "http none auth desc", endpoints[0].GetDescription())
hasLabelAssociations(t, endpoints[0].LabelAssociations, 1, "label_1")
teles := sum.TelegrafConfigs
require.Len(t, teles, 1)
assert.Equal(t, "first_tele_config", teles[0].Name)
assert.Equal(t, "desc", teles[0].Description)
hasLabelAssociations(t, teles[0].LabelAssociations, 1, "label_1")
vars := sum.Variables
require.Len(t, vars, 1)
assert.Equal(t, "var_query_1", vars[0].Name)
@ -167,25 +194,25 @@ func TestLauncher_Pkger(t *testing.T) {
labels := sum1.Labels
require.Len(t, labels, 1)
assert.NotEqual(t, influxdb.ID(0), labels[0].ID)
assert.NotZero(t, labels[0].ID)
assert.Equal(t, "label_1", labels[0].Name)
bkts := sum1.Buckets
require.Len(t, bkts, 1)
assert.NotEqual(t, influxdb.ID(0), bkts[0].ID)
assert.NotZero(t, bkts[0].ID)
assert.Equal(t, "rucket_1", bkts[0].Name)
hasLabelAssociations(t, bkts[0].LabelAssociations, 1, "label_1")
dashs := sum1.Dashboards
require.Len(t, dashs, 1)
assert.NotEqual(t, influxdb.ID(0), dashs[0].ID)
assert.NotZero(t, dashs[0].ID)
assert.Equal(t, "dash_1", dashs[0].Name)
assert.Equal(t, "desc1", dashs[0].Description)
hasLabelAssociations(t, dashs[0].LabelAssociations, 1, "label_1")
vars := sum1.Variables
require.Len(t, vars, 1)
assert.NotEqual(t, influxdb.ID(0), vars[0].ID)
assert.NotZero(t, vars[0].ID)
assert.Equal(t, "var_query_1", vars[0].Name)
hasLabelAssociations(t, vars[0].LabelAssociations, 1, "label_1")
varArgs := vars[0].Arguments
@ -328,6 +355,7 @@ func TestLauncher_Pkger(t *testing.T) {
}),
pkger.WithDashboardSVC(l.DashboardService(t)),
pkger.WithLabelSVC(l.LabelService(t)),
pkger.WithNoticationEndpointSVC(l.NotificationEndpointService(t)),
pkger.WithTelegrafSVC(l.TelegrafService(t)),
pkger.WithVariableSVC(l.VariableService(t)),
)
@ -430,6 +458,15 @@ spec:
bucket = "rucket_3"
[[inputs.cpu]]
percpu = true
- kind: NotificationEndpointHTTP
name: http_none_auth_notification_endpoint
type: none
description: http none auth desc
url: https://www.example.com/endpoint/noneauth
status: inactive
associations:
- kind: Label
name: label_1
`
const updatePkgYMLStr = `apiVersion: 0.1.0
@ -477,18 +514,14 @@ func (f *fakeBucketSVC) UpdateBucket(ctx context.Context, id influxdb.ID, upd in
type fakeLabelSVC struct {
influxdb.LabelService
countMu sync.Mutex
callCount int
callCount mock.SafeCount
killCount int
}
func (f *fakeLabelSVC) CreateLabelMapping(ctx context.Context, m *influxdb.LabelMapping) error {
f.countMu.Lock()
if f.callCount == f.killCount {
f.countMu.Unlock()
defer f.callCount.IncrFn()()
if f.callCount.Count() == f.killCount {
return errors.New("reached kill count")
}
f.callCount++
f.countMu.Unlock()
return f.LabelService.CreateLabelMapping(ctx, m)
}

View File

@ -609,7 +609,7 @@ var _ influxdb.NotificationEndpointService = (*NotificationEndpointService)(nil)
// FindNotificationEndpointByID returns a single notification endpoint by ID.
func (s *NotificationEndpointService) FindNotificationEndpointByID(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
var resp notificationEndpointResponse
var resp notificationEndpointDecoder
err := s.Client.
Get(prefixNotificationEndpoints, id.String()).
DecodeJSON(&resp).
@ -617,7 +617,7 @@ func (s *NotificationEndpointService) FindNotificationEndpointByID(ctx context.C
if err != nil {
return nil, err
}
return resp.NotificationEndpoint, nil
return resp.endpoint, nil
}
// FindNotificationEndpoints returns a list of notification endpoints that match filter and the total count of matching notification endpoints.
@ -634,7 +634,9 @@ func (s *NotificationEndpointService) FindNotificationEndpoints(ctx context.Cont
params = append(params, [2]string{"org", *filter.Org})
}
var resp notificationEndpointsResponse
var resp struct {
Endpoints []notificationEndpointDecoder `json:"notificationEndpoints"`
}
err := s.Client.
Get(prefixNotificationEndpoints).
QueryParams(params...).
@ -645,11 +647,10 @@ func (s *NotificationEndpointService) FindNotificationEndpoints(ctx context.Cont
}
var endpoints []influxdb.NotificationEndpoint
for _, e := range resp.NotificationEndpoints {
endpoints = append(endpoints, e.NotificationEndpoint)
for _, e := range resp.Endpoints {
endpoints = append(endpoints, e.endpoint)
}
return endpoints, len(resp.NotificationEndpoints), nil
return endpoints, len(endpoints), nil
}
// CreateNotificationEndpoint creates a new notification endpoint and sets b.ID with the new identifier.
@ -658,7 +659,7 @@ func (s *NotificationEndpointService) FindNotificationEndpoints(ctx context.Cont
func (s *NotificationEndpointService) CreateNotificationEndpoint(ctx context.Context, ne influxdb.NotificationEndpoint, userID influxdb.ID) error {
// userID is ignored here since server reads it off
// the token/auth. its a nothing burger here
var resp notificationEndpointResponse
var resp notificationEndpointDecoder
err := s.Client.
Post(httpc.BodyJSON(ne), prefixNotificationEndpoints).
DecodeJSON(&resp).
@ -667,15 +668,15 @@ func (s *NotificationEndpointService) CreateNotificationEndpoint(ctx context.Con
return err
}
// :sadpanda:
ne.SetID(resp.GetID())
ne.SetOrgID(resp.GetOrgID())
ne.SetID(resp.endpoint.GetID())
ne.SetOrgID(resp.endpoint.GetOrgID())
return nil
}
// UpdateNotificationEndpoint updates a single notification endpoint.
// Returns the new notification endpoint after update.
func (s *NotificationEndpointService) UpdateNotificationEndpoint(ctx context.Context, id influxdb.ID, nr influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
var resp notificationEndpointResponse
var resp notificationEndpointDecoder
err := s.Client.
Put(httpc.BodyJSON(nr), prefixNotificationEndpoints, id.String()).
DecodeJSON(&resp).
@ -683,7 +684,7 @@ func (s *NotificationEndpointService) UpdateNotificationEndpoint(ctx context.Con
if err != nil {
return nil, err
}
return resp.NotificationEndpoint, nil
return resp.endpoint, nil
}
// PatchNotificationEndpoint updates a single notification endpoint with changeset.
@ -693,7 +694,7 @@ func (s *NotificationEndpointService) PatchNotificationEndpoint(ctx context.Cont
return nil, err
}
var resp notificationEndpointResponse
var resp notificationEndpointDecoder
err := s.Client.
Patch(httpc.BodyJSON(upd), prefixNotificationEndpoints, id.String()).
DecodeJSON(&resp).
@ -701,10 +702,23 @@ func (s *NotificationEndpointService) PatchNotificationEndpoint(ctx context.Cont
if err != nil {
return nil, err
}
return resp.NotificationEndpoint, nil
return resp.endpoint, nil
}
// DeleteNotificationEndpoint removes a notification endpoint by ID, returns secret fields, orgID for further deletion.
func (s *NotificationEndpointService) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) (flds []influxdb.SecretField, orgID influxdb.ID, err error) {
panic("not implemented")
}
type notificationEndpointDecoder struct {
endpoint influxdb.NotificationEndpoint
}
func (n *notificationEndpointDecoder) UnmarshalJSON(b []byte) error {
newEndpoint, err := endpoint.UnmarshalJSON(b)
if err != nil {
return err
}
n.endpoint = newEndpoint
return nil
}

View File

@ -19,6 +19,7 @@ const (
KindBucket Kind = "bucket"
KindDashboard Kind = "dashboard"
KindLabel Kind = "label"
KindNotificationEndpoint Kind = "notificationendpoint"
KindNotificationEndpointPagerDuty Kind = "notificationendpointpagerduty"
KindNotificationEndpointHTTP Kind = "notificationendpointhttp"
KindNotificationEndpointSlack Kind = "notificationendpointslack"
@ -79,7 +80,8 @@ func (k Kind) ResourceType() influxdb.ResourceType {
return influxdb.DashboardsResourceType
case KindLabel:
return influxdb.LabelsResourceType
case KindNotificationEndpointHTTP,
case KindNotificationEndpoint,
KindNotificationEndpointHTTP,
KindNotificationEndpointPagerDuty,
KindNotificationEndpointSlack:
return influxdb.NotificationEndpointResourceType
@ -133,12 +135,13 @@ type Metadata struct {
// Diff is the result of a service DryRun call. The diff outlines
// what is new and or updated from the current state of the platform.
type Diff struct {
Buckets []DiffBucket `json:"buckets"`
Dashboards []DiffDashboard `json:"dashboards"`
Labels []DiffLabel `json:"labels"`
LabelMappings []DiffLabelMapping `json:"labelMappings"`
Telegrafs []DiffTelegraf `json:"telegrafConfigs"`
Variables []DiffVariable `json:"variables"`
Buckets []DiffBucket `json:"buckets"`
Dashboards []DiffDashboard `json:"dashboards"`
Labels []DiffLabel `json:"labels"`
LabelMappings []DiffLabelMapping `json:"labelMappings"`
NotificationEndpoints []DiffNotificationEndpoint `json:"notificationEndpoints"`
Telegrafs []DiffTelegraf `json:"telegrafConfigs"`
Variables []DiffVariable `json:"variables"`
}
// HasConflicts provides a binary t/f if there are any changes within package
@ -291,6 +294,35 @@ type DiffLabelMapping struct {
LabelName string `json:"labelName"`
}
// DiffNotificationEndpointValues are the varying values for a notification endpoint.
type DiffNotificationEndpointValues struct {
influxdb.NotificationEndpoint
}
// DiffNotificationEndpoint is a diff of an individual notification endpoint.
type DiffNotificationEndpoint struct {
ID SafeID `json:"id"`
Name string `json:"name"`
New DiffNotificationEndpointValues `json:"new"`
Old *DiffNotificationEndpointValues `json:"old,omitempty"` // using omitempty here to signal there was no prev state with a nil
}
func newDiffNotificationEndpoint(ne *notificationEndpoint, i influxdb.NotificationEndpoint) DiffNotificationEndpoint {
diff := DiffNotificationEndpoint{
Name: ne.Name(),
New: DiffNotificationEndpointValues{
NotificationEndpoint: ne.summarize().NotificationEndpoint,
},
}
if i != nil {
diff.ID = SafeID(i.GetID())
diff.Old = &DiffNotificationEndpointValues{
NotificationEndpoint: i,
}
}
return diff
}
// DiffTelegraf is a diff of an individual telegraf.
type DiffTelegraf struct {
influxdb.TelegrafConfig
@ -488,6 +520,10 @@ func (b *bucket) ID() influxdb.ID {
return b.id
}
func (b *bucket) Labels() []*label {
return b.labels
}
func (b *bucket) Name() string {
return b.name
}
@ -524,6 +560,16 @@ func (b *bucket) shouldApply() bool {
b.RetentionRules.RP() != b.existing.RetentionPeriod
}
type mapperBuckets []*bucket
func (b mapperBuckets) Association(i int) labelAssociater {
return b[i]
}
func (b mapperBuckets) Len() int {
return len(b)
}
const (
retentionRuleTypeExpire = "expire"
)
@ -765,6 +811,8 @@ const (
type notificationEndpoint struct {
kind notificationKind
id influxdb.ID
OrgID influxdb.ID
name string
description string
password string
@ -776,6 +824,23 @@ type notificationEndpoint struct {
username string
labels sortedLabels
existing influxdb.NotificationEndpoint
}
func (n *notificationEndpoint) Exists() bool {
return n.existing != nil
}
func (n *notificationEndpoint) ID() influxdb.ID {
if n.existing != nil {
return n.existing.GetID()
}
return n.id
}
func (n *notificationEndpoint) Labels() []*label {
return n.labels
}
func (n *notificationEndpoint) Name() string {
@ -786,12 +851,18 @@ func (n *notificationEndpoint) ResourceType() influxdb.ResourceType {
return KindNotificationEndpointSlack.ResourceType()
}
func (n *notificationEndpoint) summarize() SummaryNotificationEndpoint {
base := endpoint.Base{
func (n *notificationEndpoint) base() endpoint.Base {
return endpoint.Base{
ID: n.ID(),
OrgID: n.OrgID,
Name: n.Name(),
Description: n.description,
Status: influxdb.TaskStatusActive,
}
}
func (n *notificationEndpoint) summarize() SummaryNotificationEndpoint {
base := n.base()
if n.status != "" {
base.Status = influxdb.Status(n.status)
}
@ -891,6 +962,16 @@ func (n *notificationEndpoint) valid() []validationErr {
return failures
}
type mapperNotificationEndpoints []*notificationEndpoint
func (n mapperNotificationEndpoints) Association(i int) labelAssociater {
return n[i]
}
func (n mapperNotificationEndpoints) Len() int {
return len(n)
}
const (
fieldTelegrafConfig = "config"
)
@ -905,6 +986,10 @@ func (t *telegraf) ID() influxdb.ID {
return t.config.ID
}
func (t *telegraf) Labels() []*label {
return t.labels
}
func (t *telegraf) Name() string {
return t.config.Name
}
@ -924,6 +1009,16 @@ func (t *telegraf) summarize() SummaryTelegraf {
}
}
type mapperTelegrafs []*telegraf
func (m mapperTelegrafs) Association(i int) labelAssociater {
return m[i]
}
func (m mapperTelegrafs) Len() int {
return len(m)
}
const (
fieldArgTypeConstant = "constant"
fieldArgTypeMap = "map"
@ -957,6 +1052,10 @@ func (v *variable) Exists() bool {
return v.existing != nil
}
func (v *variable) Labels() []*label {
return v.labels
}
func (v *variable) Name() string {
return v.name
}
@ -969,7 +1068,7 @@ func (v *variable) shouldApply() bool {
return v.existing == nil ||
v.existing.Description != v.Description ||
v.existing.Arguments == nil ||
v.existing.Arguments.Type != v.Type
!reflect.DeepEqual(v.existing.Arguments, v.influxVarArgs())
}
func (v *variable) summarize() SummaryVariable {
@ -1037,6 +1136,16 @@ func (v *variable) valid() []validationErr {
return failures
}
type mapperVariables []*variable
func (m mapperVariables) Association(i int) labelAssociater {
return m[i]
}
func (m mapperVariables) Len() int {
return len(m)
}
const (
fieldDashCharts = "charts"
)
@ -1055,6 +1164,10 @@ func (d *dashboard) ID() influxdb.ID {
return d.id
}
func (d *dashboard) Labels() []*label {
return d.labels
}
func (d *dashboard) Name() string {
return d.name
}
@ -1087,6 +1200,16 @@ func (d *dashboard) summarize() SummaryDashboard {
return iDash
}
type mapperDashboards []*dashboard
func (m mapperDashboards) Association(i int) labelAssociater {
return m[i]
}
func (m mapperDashboards) Len() int {
return len(m)
}
const (
fieldChartAxes = "axes"
fieldChartBinCount = "binCount"

View File

@ -26,12 +26,13 @@ type SVC interface {
}
type serviceOpt struct {
logger *zap.Logger
labelSVC influxdb.LabelService
bucketSVC influxdb.BucketService
dashSVC influxdb.DashboardService
teleSVC influxdb.TelegrafConfigStore
varSVC influxdb.VariableService
logger *zap.Logger
labelSVC influxdb.LabelService
bucketSVC influxdb.BucketService
dashSVC influxdb.DashboardService
endpointSVC influxdb.NotificationEndpointService
teleSVC influxdb.TelegrafConfigStore
varSVC influxdb.VariableService
applyReqLimit int
}
@ -60,6 +61,13 @@ func WithDashboardSVC(dashSVC influxdb.DashboardService) ServiceSetterFn {
}
}
// WithNoticationEndpointSVC sets the endpoint notification service.
func WithNoticationEndpointSVC(endpointSVC influxdb.NotificationEndpointService) ServiceSetterFn {
return func(opt *serviceOpt) {
opt.endpointSVC = endpointSVC
}
}
// WithLabelSVC sets the label service.
func WithLabelSVC(labelSVC influxdb.LabelService) ServiceSetterFn {
return func(opt *serviceOpt) {
@ -95,11 +103,12 @@ func WithApplyReqLimit(limit int) ServiceSetterFn {
type Service struct {
log *zap.Logger
labelSVC influxdb.LabelService
bucketSVC influxdb.BucketService
dashSVC influxdb.DashboardService
teleSVC influxdb.TelegrafConfigStore
varSVC influxdb.VariableService
labelSVC influxdb.LabelService
bucketSVC influxdb.BucketService
dashSVC influxdb.DashboardService
endpointSVC influxdb.NotificationEndpointService
teleSVC influxdb.TelegrafConfigStore
varSVC influxdb.VariableService
applyReqLimit int
}
@ -119,6 +128,7 @@ func NewService(opts ...ServiceSetterFn) *Service {
bucketSVC: opt.bucketSVC,
labelSVC: opt.labelSVC,
dashSVC: opt.dashSVC,
endpointSVC: opt.endpointSVC,
teleSVC: opt.teleSVC,
varSVC: opt.varSVC,
applyReqLimit: opt.applyReqLimit,
@ -501,6 +511,11 @@ func (s *Service) DryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (Summ
return Summary{}, Diff{}, err
}
diffEndpoints, err := s.dryRunNotificationEndpoints(ctx, orgID, pkg)
if err != nil {
return Summary{}, Diff{}, err
}
diffVars, err := s.dryRunVariables(ctx, orgID, pkg)
if err != nil {
return Summary{}, Diff{}, err
@ -517,12 +532,13 @@ func (s *Service) DryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (Summ
pkg.isVerified = true
diff := Diff{
Buckets: diffBuckets,
Dashboards: s.dryRunDashboards(ctx, orgID, pkg),
Labels: diffLabels,
LabelMappings: diffLabelMappings,
Telegrafs: s.dryRunTelegraf(ctx, orgID, pkg),
Variables: diffVars,
Buckets: diffBuckets,
Dashboards: s.dryRunDashboards(ctx, orgID, pkg),
Labels: diffLabels,
LabelMappings: diffLabelMappings,
NotificationEndpoints: diffEndpoints,
Telegrafs: s.dryRunTelegraf(ctx, orgID, pkg),
Variables: diffVars,
}
return pkg.Summary(), diff, parseErr
}
@ -555,7 +571,7 @@ func (s *Service) dryRunBuckets(ctx context.Context, orgID influxdb.ID, pkg *Pkg
return diffs, nil
}
func (s *Service) dryRunDashboards(ctx context.Context, orgID influxdb.ID, pkg *Pkg) []DiffDashboard {
func (s *Service) dryRunDashboards(_ context.Context, _ influxdb.ID, pkg *Pkg) []DiffDashboard {
var diffs []DiffDashboard
for _, d := range pkg.dashboards() {
diffs = append(diffs, newDiffDashboard(d))
@ -595,7 +611,45 @@ func (s *Service) dryRunLabels(ctx context.Context, orgID influxdb.ID, pkg *Pkg)
return diffs, nil
}
func (s *Service) dryRunTelegraf(ctx context.Context, orgID influxdb.ID, pkg *Pkg) []DiffTelegraf {
func (s *Service) dryRunNotificationEndpoints(ctx context.Context, orgID influxdb.ID, pkg *Pkg) ([]DiffNotificationEndpoint, error) {
existingEndpoints, _, err := s.endpointSVC.FindNotificationEndpoints(ctx, influxdb.NotificationEndpointFilter{
OrgID: &orgID,
}) // grab em all
if err != nil {
return nil, err
}
mExisting := make(map[string]influxdb.NotificationEndpoint)
for i := range existingEndpoints {
e := existingEndpoints[i]
mExisting[e.GetName()] = e
}
mExistingToNew := make(map[string]DiffNotificationEndpoint)
endpoints := pkg.notificationEndpoints()
for i := range endpoints {
newEndpoint := endpoints[i]
var existing influxdb.NotificationEndpoint
if iExisting, ok := mExisting[newEndpoint.Name()]; ok {
newEndpoint.existing = iExisting
existing = iExisting
}
mExistingToNew[newEndpoint.Name()] = newDiffNotificationEndpoint(newEndpoint, existing)
}
var diffs []DiffNotificationEndpoint
for _, diff := range mExistingToNew {
diffs = append(diffs, diff)
}
sort.Slice(diffs, func(i, j int) bool {
return diffs[i].Name < diffs[j].Name
})
return diffs, nil
}
func (s *Service) dryRunTelegraf(_ context.Context, _ influxdb.ID, pkg *Pkg) []DiffTelegraf {
var diffs []DiffTelegraf
for _, t := range pkg.telegrafs() {
diffs = append(diffs, newDiffTelegraf(t))
@ -649,82 +703,47 @@ VarLoop:
type (
labelMappingDiffFn func(labelID influxdb.ID, labelName string, isNew bool)
labelMappers interface {
Association(i int) labelAssociater
Len() int
}
labelAssociater interface {
ID() influxdb.ID
Name() string
Labels() []*label
ResourceType() influxdb.ResourceType
Exists() bool
}
)
func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg) ([]DiffLabelMapping, error) {
mappers := []labelMappers{
mapperBuckets(pkg.buckets()),
mapperDashboards(pkg.mDashboards),
mapperNotificationEndpoints(pkg.notificationEndpoints()),
mapperTelegrafs(pkg.mTelegrafs),
mapperVariables(pkg.variables()),
}
var diffs []DiffLabelMapping
for _, b := range pkg.buckets() {
err := s.dryRunResourceLabelMapping(ctx, b, b.labels, func(labelID influxdb.ID, labelName string, isNew bool) {
if l, ok := pkg.mLabels[labelName]; ok {
l.setMapping(b, !isNew)
for _, mapper := range mappers {
for i := 0; i < mapper.Len(); i++ {
la := mapper.Association(i)
err := s.dryRunResourceLabelMapping(ctx, la, func(labelID influxdb.ID, labelName string, isNew bool) {
pkg.mLabels[labelName].setMapping(la, !isNew)
diffs = append(diffs, DiffLabelMapping{
IsNew: isNew,
ResType: la.ResourceType(),
ResID: SafeID(la.ID()),
ResName: la.Name(),
LabelID: SafeID(labelID),
LabelName: labelName,
})
})
if err != nil {
return nil, err
}
diffs = append(diffs, DiffLabelMapping{
IsNew: isNew,
ResType: b.ResourceType(),
ResID: SafeID(b.ID()),
ResName: b.Name(),
LabelID: SafeID(labelID),
LabelName: labelName,
})
})
if err != nil {
return nil, err
}
}
for _, d := range pkg.dashboards() {
err := s.dryRunResourceLabelMapping(ctx, d, d.labels, func(labelID influxdb.ID, labelName string, isNew bool) {
pkg.mLabels[labelName].setMapping(d, false)
diffs = append(diffs, DiffLabelMapping{
IsNew: isNew,
ResType: d.ResourceType(),
ResID: SafeID(d.ID()),
ResName: d.Name(),
LabelID: SafeID(labelID),
LabelName: labelName,
})
})
if err != nil {
return nil, err
}
}
for _, t := range pkg.telegrafs() {
err := s.dryRunResourceLabelMapping(ctx, t, t.labels, func(labelID influxdb.ID, labelName string, isNew bool) {
pkg.mLabels[labelName].setMapping(t, false)
diffs = append(diffs, DiffLabelMapping{
IsNew: isNew,
ResType: t.ResourceType(),
ResID: SafeID(t.ID()),
ResName: t.Name(),
LabelID: SafeID(labelID),
LabelName: labelName,
})
})
if err != nil {
return nil, err
}
}
for _, v := range pkg.variables() {
err := s.dryRunResourceLabelMapping(ctx, v, v.labels, func(labelID influxdb.ID, labelName string, isNew bool) {
pkg.mLabels[labelName].setMapping(v, !isNew)
diffs = append(diffs, DiffLabelMapping{
IsNew: isNew,
ResType: v.ResourceType(),
ResID: SafeID(v.ID()),
ResName: v.Name(),
LabelID: SafeID(labelID),
LabelName: labelName,
})
})
if err != nil {
return nil, err
}
}
@ -749,9 +768,9 @@ func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg) ([]DiffLabe
return diffs, nil
}
func (s *Service) dryRunResourceLabelMapping(ctx context.Context, la labelAssociater, labels []*label, mappingFn labelMappingDiffFn) error {
func (s *Service) dryRunResourceLabelMapping(ctx context.Context, la labelAssociater, mappingFn labelMappingDiffFn) error {
if !la.Exists() {
for _, l := range labels {
for _, l := range la.Labels() {
mappingFn(l.ID(), l.Name(), true)
}
return nil
@ -770,7 +789,7 @@ func (s *Service) dryRunResourceLabelMapping(ctx context.Context, la labelAssoci
return err
}
pkgLabels := labelSlcToMap(labels)
pkgLabels := labelSlcToMap(la.Labels())
for _, l := range existingLabels {
// should ignore any labels that are not specified in pkg
mappingFn(l.ID, l.Name, false)
@ -1275,7 +1294,7 @@ func (s *Service) applyLabelMappings(labelMappings []SummaryLabelMapping) applie
err := s.labelSVC.CreateLabelMapping(ctx, &mapping.LabelMapping)
if err != nil {
return &applyErrBody{
name: fmt.Sprintf("%s:%s", mapping.ResourceID, mapping.LabelID),
name: fmt.Sprintf("%s:%s:%s", mapping.ResourceType, mapping.ResourceID, mapping.LabelID),
msg: err.Error(),
}
}

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/notification/endpoint"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
@ -18,11 +19,12 @@ import (
func TestService(t *testing.T) {
newTestService := func(opts ...ServiceSetterFn) *Service {
opt := serviceOpt{
bucketSVC: mock.NewBucketService(),
dashSVC: mock.NewDashboardService(),
labelSVC: mock.NewLabelService(),
teleSVC: mock.NewTelegrafConfigStore(),
varSVC: mock.NewVariableService(),
bucketSVC: mock.NewBucketService(),
dashSVC: mock.NewDashboardService(),
labelSVC: mock.NewLabelService(),
endpointSVC: mock.NewNotificationEndpointService(),
teleSVC: mock.NewTelegrafConfigStore(),
varSVC: mock.NewVariableService(),
}
for _, o := range opts {
o(&opt)
@ -32,6 +34,7 @@ func TestService(t *testing.T) {
WithBucketSVC(opt.bucketSVC),
WithDashboardSVC(opt.dashSVC),
WithLabelSVC(opt.labelSVC),
WithNoticationEndpointSVC(opt.endpointSVC),
WithTelegrafSVC(opt.teleSVC),
WithVariableSVC(opt.varSVC),
)
@ -40,7 +43,7 @@ func TestService(t *testing.T) {
t.Run("DryRun", func(t *testing.T) {
t.Run("buckets", func(t *testing.T) {
t.Run("single bucket updated", func(t *testing.T) {
testfileRunner(t, "testdata/bucket", func(t *testing.T, pkg *Pkg) {
testfileRunner(t, "testdata/bucket.yml", func(t *testing.T, pkg *Pkg) {
fakeBktSVC := mock.NewBucketService()
fakeBktSVC.FindBucketByNameFn = func(_ context.Context, orgID influxdb.ID, name string) (*influxdb.Bucket, error) {
return &influxdb.Bucket{
@ -51,7 +54,7 @@ func TestService(t *testing.T) {
RetentionPeriod: 30 * time.Hour,
}, nil
}
svc := newTestService(WithBucketSVC(fakeBktSVC), WithLabelSVC(mock.NewLabelService()))
svc := newTestService(WithBucketSVC(fakeBktSVC))
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg)
require.NoError(t, err)
@ -75,12 +78,12 @@ func TestService(t *testing.T) {
})
t.Run("single bucket new", func(t *testing.T) {
testfileRunner(t, "testdata/bucket", func(t *testing.T, pkg *Pkg) {
testfileRunner(t, "testdata/bucket.json", func(t *testing.T, pkg *Pkg) {
fakeBktSVC := mock.NewBucketService()
fakeBktSVC.FindBucketByNameFn = func(_ context.Context, orgID influxdb.ID, name string) (*influxdb.Bucket, error) {
return nil, errors.New("not found")
}
svc := newTestService(WithBucketSVC(fakeBktSVC), WithLabelSVC(mock.NewLabelService()))
svc := newTestService(WithBucketSVC(fakeBktSVC))
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg)
require.NoError(t, err)
@ -101,7 +104,7 @@ func TestService(t *testing.T) {
t.Run("labels", func(t *testing.T) {
t.Run("two labels updated", func(t *testing.T) {
testfileRunner(t, "testdata/label", func(t *testing.T, pkg *Pkg) {
testfileRunner(t, "testdata/label.json", func(t *testing.T, pkg *Pkg) {
fakeLabelSVC := mock.NewLabelService()
fakeLabelSVC.FindLabelsFn = func(_ context.Context, filter influxdb.LabelFilter) ([]*influxdb.Label, error) {
return []*influxdb.Label{
@ -144,7 +147,7 @@ func TestService(t *testing.T) {
})
t.Run("two labels created", func(t *testing.T) {
testfileRunner(t, "testdata/label", func(t *testing.T, pkg *Pkg) {
testfileRunner(t, "testdata/label.yml", func(t *testing.T, pkg *Pkg) {
fakeLabelSVC := mock.NewLabelService()
fakeLabelSVC.FindLabelsFn = func(_ context.Context, filter influxdb.LabelFilter) ([]*influxdb.Label, error) {
return nil, errors.New("no labels found")
@ -173,6 +176,69 @@ func TestService(t *testing.T) {
})
})
t.Run("notification endpoints", func(t *testing.T) {
testfileRunner(t, "testdata/notification_endpoint.yml", func(t *testing.T, pkg *Pkg) {
fakeEndpointSVC := mock.NewNotificationEndpointService()
existing := &endpoint.HTTP{
Base: endpoint.Base{
ID: 1,
Name: "http_none_auth_notification_endpoint",
Description: "old desc",
Status: influxdb.TaskStatusInactive,
},
Method: "POST",
AuthMethod: "none",
URL: "https://www.example.com/endpoint/old",
}
fakeEndpointSVC.FindNotificationEndpointsF = func(ctx context.Context, f influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error) {
return []influxdb.NotificationEndpoint{existing}, 1, nil
}
svc := newTestService(WithNoticationEndpointSVC(fakeEndpointSVC))
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg)
require.NoError(t, err)
require.Len(t, diff.NotificationEndpoints, 5)
var (
newEndpoints []DiffNotificationEndpoint
existingEndpoints []DiffNotificationEndpoint
)
for _, e := range diff.NotificationEndpoints {
if e.Old != nil {
existingEndpoints = append(existingEndpoints, e)
continue
}
newEndpoints = append(newEndpoints, e)
}
require.Len(t, newEndpoints, 4)
require.Len(t, existingEndpoints, 1)
expected := DiffNotificationEndpoint{
ID: SafeID(1),
Name: "http_none_auth_notification_endpoint",
Old: &DiffNotificationEndpointValues{
NotificationEndpoint: existing,
},
New: DiffNotificationEndpointValues{
NotificationEndpoint: &endpoint.HTTP{
Base: endpoint.Base{
ID: 1,
Name: "http_none_auth_notification_endpoint",
Description: "http none auth desc",
Status: influxdb.TaskStatusActive,
},
AuthMethod: "none",
Method: "POST",
URL: "https://www.example.com/endpoint/noneauth",
},
},
}
assert.Equal(t, expected, existingEndpoints[0])
})
})
t.Run("variables", func(t *testing.T) {
testfileRunner(t, "testdata/variables", func(t *testing.T, pkg *Pkg) {
fakeVarSVC := mock.NewVariableService()
@ -185,8 +251,7 @@ func TestService(t *testing.T) {
},
}, nil
}
fakeLabelSVC := mock.NewLabelService() // ignore mappings for now
svc := newTestService(WithLabelSVC(fakeLabelSVC), WithVariableSVC(fakeVarSVC))
svc := newTestService(WithVariableSVC(fakeVarSVC))
_, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg)
require.NoError(t, err)