feat(pkger): add apply functionality for notification endpoints kind

pull/16209/head
Johnny Steenbergen 2019-12-10 14:51:11 -08:00 committed by Johnny Steenbergen
parent 69d7eb4455
commit 3daaa4d307
13 changed files with 440 additions and 96 deletions

View File

@ -518,6 +518,7 @@ func newPkgerSVC(cliReqOpts httpClientOpts, opts ...pkger.ServiceSetterFn) (pkge
pkger.WithBucketSVC(&ihttp.BucketService{Client: httpClient}),
pkger.WithDashboardSVC(&ihttp.DashboardService{Client: httpClient}),
pkger.WithLabelSVC(&ihttp.LabelService{Client: httpClient}),
pkger.WithNoticationEndpointSVC(ihttp.NewNotificationEndpointService(httpClient)),
pkger.WithTelegrafSVC(ihttp.NewTelegrafService(httpClient)),
pkger.WithVariableSVC(&ihttp.VariableService{Client: httpClient}),
)...,
@ -654,6 +655,18 @@ func (b *cmdPkgBuilder) printPkgDiff(diff pkger.Diff) {
})
}
if endpoints := diff.NotificationEndpoints; len(endpoints) > 0 {
headers := []string{"New", "ID", "Name"}
tablePrintFn("NOTIFICATION ENDPOINTS", headers, len(endpoints), func(i int) []string {
v := endpoints[i]
return []string{
boolDiff(v.IsNew()),
v.ID.String(),
v.Name,
}
})
}
if teles := diff.Telegrafs; len(diff.Telegrafs) > 0 {
headers := []string{"New", "Name", "Description"}
tablePrintFn("TELEGRAF CONFIGS", headers, len(teles), func(i int) []string {
@ -733,6 +746,19 @@ func (b *cmdPkgBuilder) printPkgSummary(sum pkger.Summary) {
})
}
if endpoints := sum.NotificationEndpoints; len(endpoints) > 0 {
headers := []string{"ID", "Name", "Description", "Status"}
tablePrintFn("NOTIFICATION ENDPOINTS", headers, len(endpoints), func(i int) []string {
v := endpoints[i]
return []string{
v.GetID().String(),
v.GetName(),
v.GetDescription(),
string(v.GetStatus()),
}
})
}
if teles := sum.TelegrafConfigs; len(teles) > 0 {
headers := []string{"ID", "Name", "Description"}
tablePrintFn("TELEGRAF CONFIGS", headers, len(teles), func(i int) []string {

View File

@ -210,6 +210,22 @@ func TestLauncher_Pkger(t *testing.T) {
assert.Equal(t, "desc1", dashs[0].Description)
hasLabelAssociations(t, dashs[0].LabelAssociations, 1, "label_1")
endpoints := sum1.NotificationEndpoints
require.Len(t, endpoints, 1)
assert.NotZero(t, endpoints[0].GetID())
assert.Equal(t, "http_none_auth_notification_endpoint", endpoints[0].GetName())
assert.Equal(t, "http none auth desc", endpoints[0].GetDescription())
assert.Equal(t, influxdb.TaskStatusInactive, string(endpoints[0].GetStatus()))
hasLabelAssociations(t, endpoints[0].LabelAssociations, 1, "label_1")
teles := sum1.TelegrafConfigs
require.Len(t, teles, 1)
assert.NotZero(t, teles[0].ID)
assert.Equal(t, l.Org.ID, teles[0].OrgID)
assert.Equal(t, "first_tele_config", teles[0].Name)
assert.Equal(t, "desc", teles[0].Description)
assert.Len(t, teles[0].Plugins, 2)
vars := sum1.Variables
require.Len(t, vars, 1)
assert.NotZero(t, vars[0].ID)
@ -223,14 +239,6 @@ func TestLauncher_Pkger(t *testing.T) {
Language: "flux",
}, varArgs.Values)
teles := sum1.TelegrafConfigs
require.Len(t, teles, 1)
assert.NotZero(t, teles[0].ID)
assert.Equal(t, l.Org.ID, teles[0].OrgID)
assert.Equal(t, "first_tele_config", teles[0].Name)
assert.Equal(t, "desc", teles[0].Description)
assert.Len(t, teles[0].Plugins, 2)
newSumMapping := func(id influxdb.ID, name string, rt influxdb.ResourceType) pkger.SummaryLabelMapping {
return pkger.SummaryLabelMapping{
ResourceName: name,
@ -244,7 +252,7 @@ func TestLauncher_Pkger(t *testing.T) {
}
mappings := sum1.LabelMappings
require.Len(t, mappings, 4)
require.Len(t, mappings, 5)
hasMapping(t, mappings, newSumMapping(bkts[0].ID, bkts[0].Name, influxdb.BucketsResourceType))
hasMapping(t, mappings, newSumMapping(influxdb.ID(dashs[0].ID), dashs[0].Name, influxdb.DashboardsResourceType))
hasMapping(t, mappings, newSumMapping(vars[0].ID, vars[0].Name, influxdb.VariablesResourceType))
@ -258,6 +266,7 @@ func TestLauncher_Pkger(t *testing.T) {
require.Equal(t, sum1.Buckets, sum2.Buckets)
require.Equal(t, sum1.Labels, sum2.Labels)
require.Equal(t, sum1.NotificationEndpoints, sum2.NotificationEndpoints)
require.Equal(t, sum1.Variables, sum2.Variables)
// dashboards should be new
@ -462,6 +471,7 @@ spec:
name: http_none_auth_notification_endpoint
type: none
description: http none auth desc
method: GET
url: https://www.example.com/endpoint/noneauth
status: inactive
associations:

View File

@ -661,7 +661,7 @@ func (s *NotificationEndpointService) CreateNotificationEndpoint(ctx context.Con
// the token/auth. its a nothing burger here
var resp notificationEndpointDecoder
err := s.Client.
Post(httpc.BodyJSON(ne), prefixNotificationEndpoints).
Post(httpc.BodyJSON(&notificationEndpointEncoder{ne: ne}), prefixNotificationEndpoints).
DecodeJSON(&resp).
Do(ctx)
if err != nil {
@ -675,10 +675,11 @@ func (s *NotificationEndpointService) CreateNotificationEndpoint(ctx context.Con
// UpdateNotificationEndpoint updates a single notification endpoint.
// Returns the new notification endpoint after update.
func (s *NotificationEndpointService) UpdateNotificationEndpoint(ctx context.Context, id influxdb.ID, nr influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
func (s *NotificationEndpointService) UpdateNotificationEndpoint(ctx context.Context, id influxdb.ID, ne influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
// userID is ignored since userID is grabbed off the http auth set on the client
var resp notificationEndpointDecoder
err := s.Client.
Put(httpc.BodyJSON(nr), prefixNotificationEndpoints, id.String()).
Put(httpc.BodyJSON(&notificationEndpointEncoder{ne: ne}), prefixNotificationEndpoints, id.String()).
DecodeJSON(&resp).
Do(ctx)
if err != nil {
@ -706,8 +707,48 @@ func (s *NotificationEndpointService) PatchNotificationEndpoint(ctx context.Cont
}
// DeleteNotificationEndpoint removes a notification endpoint by ID, returns secret fields, orgID for further deletion.
func (s *NotificationEndpointService) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) (flds []influxdb.SecretField, orgID influxdb.ID, err error) {
panic("not implemented")
// TODO: axe this delete design, makes little sense in how its currently being done. Right now, as an http client,
// I am forced to know how the store handles this and then figure out what the server does in between me and that store,
// then see what falls out :flushed... for now returning nothing for secrets, orgID, and only returning an error. This makes
// the code/design smell super obvious imo
func (s *NotificationEndpointService) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error) {
err := s.Client.
Delete(prefixNotificationEndpoints, id.String()).
Do(ctx)
return nil, 0, err
}
type notificationEndpointEncoder struct {
ne influxdb.NotificationEndpoint
}
func (n *notificationEndpointEncoder) MarshalJSON() ([]byte, error) {
b, err := json.Marshal(n.ne)
if err != nil {
return nil, err
}
ughhh := make(map[string]interface{})
if err := json.Unmarshal(b, &ughhh); err != nil {
return nil, err
}
n.ne.BackfillSecretKeys()
// this makes me queezy and altogether sad
fieldMap := map[string]string{
"-password": "password",
"-routing-key": "routingKey",
"-token": "token",
"-username": "username",
}
for _, sec := range n.ne.SecretFields() {
var v string
if sec.Value != nil {
v = *sec.Value
}
ughhh[fieldMap[sec.Key]] = v
}
return json.Marshal(ughhh)
}
type notificationEndpointDecoder struct {

View File

@ -7217,6 +7217,17 @@ components:
type: string
labelID:
type: string
notificationEndpoints:
type: array
items:
allOf:
- $ref: "#/components/schemas/NotificationEndpointDiscrimator"
- type: object
properties:
labelAssociations:
type: array
items:
$ref: "#/components/schemas/Label"
telegrafConfigs:
type: array
items:
@ -7318,6 +7329,19 @@ components:
type: string
labelName:
type: string
notificationEndpoints:
type: array
items:
type: object
properties:
id:
type: string
name:
type: string
new:
$ref: "#/components/schemas/NotificationEndpointDiscrimator"
old:
$ref: "#/components/schemas/NotificationEndpointDiscrimator"
telegrafConfigs:
type: array
items:
@ -10296,6 +10320,7 @@ components:
type: string
NotificationEndpointUpdate:
type: object
properties:
name:
type: string

View File

@ -12,12 +12,18 @@ var _ influxdb.NotificationEndpointService = &NotificationEndpointService{}
type NotificationEndpointService struct {
*OrganizationService
*UserResourceMappingService
FindNotificationEndpointByIDF func(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error)
FindNotificationEndpointsF func(ctx context.Context, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error)
CreateNotificationEndpointF func(ctx context.Context, nr influxdb.NotificationEndpoint, userID influxdb.ID) error
UpdateNotificationEndpointF func(ctx context.Context, id influxdb.ID, nr influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error)
PatchNotificationEndpointF func(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error)
DeleteNotificationEndpointF func(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error)
FindNotificationEndpointByIDF func(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error)
FindNotificationEndpointByIDCalls SafeCount
FindNotificationEndpointsF func(ctx context.Context, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error)
FindNotificationEndpointsCalls SafeCount
CreateNotificationEndpointF func(ctx context.Context, nr influxdb.NotificationEndpoint, userID influxdb.ID) error
CreateNotificationEndpointCalls SafeCount
UpdateNotificationEndpointF func(ctx context.Context, id influxdb.ID, nr influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error)
UpdateNotificationEndpointCalls SafeCount
PatchNotificationEndpointF func(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error)
PatchNotificationEndpointCalls SafeCount
DeleteNotificationEndpointF func(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error)
DeleteNotificationEndpointCalls SafeCount
}
func NewNotificationEndpointService() *NotificationEndpointService {
@ -47,33 +53,39 @@ func NewNotificationEndpointService() *NotificationEndpointService {
// FindNotificationEndpointByID returns a single telegraf config by ID.
func (s *NotificationEndpointService) FindNotificationEndpointByID(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
defer s.FindNotificationEndpointByIDCalls.IncrFn()()
return s.FindNotificationEndpointByIDF(ctx, id)
}
// FindNotificationEndpoints returns a list of notification rules that match filter and the total count of matching notification rules.
// Additional options provide pagination & sorting.
func (s *NotificationEndpointService) FindNotificationEndpoints(ctx context.Context, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error) {
defer s.FindNotificationEndpointsCalls.IncrFn()()
return s.FindNotificationEndpointsF(ctx, filter, opt...)
}
// CreateNotificationEndpoint creates a new notification rule and sets ID with the new identifier.
func (s *NotificationEndpointService) CreateNotificationEndpoint(ctx context.Context, nr influxdb.NotificationEndpoint, userID influxdb.ID) error {
defer s.CreateNotificationEndpointCalls.IncrFn()()
return s.CreateNotificationEndpointF(ctx, nr, userID)
}
// UpdateNotificationEndpoint updates a single notification rule.
// Returns the new notification rule after update.
func (s *NotificationEndpointService) UpdateNotificationEndpoint(ctx context.Context, id influxdb.ID, nr influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
defer s.UpdateNotificationEndpointCalls.IncrFn()()
return s.UpdateNotificationEndpointF(ctx, id, nr, userID)
}
// PatchNotificationEndpoint updates a single notification rule with changeset.
// Returns the new notification rule after update.
func (s *NotificationEndpointService) PatchNotificationEndpoint(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error) {
defer s.PatchNotificationEndpointCalls.IncrFn()()
return s.PatchNotificationEndpointF(ctx, id, upd)
}
// DeleteNotificationEndpoint removes a notification rule by ID.
func (s *NotificationEndpointService) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error) {
defer s.DeleteNotificationEndpointCalls.IncrFn()()
return s.DeleteNotificationEndpointF(ctx, id)
}

View File

@ -121,10 +121,9 @@ func (s HTTP) Valid() error {
return nil
}
type httpAlias HTTP
// MarshalJSON implement json.Marshaler interface.
func (s HTTP) MarshalJSON() ([]byte, error) {
type httpAlias HTTP
return json.Marshal(
struct {
httpAlias

View File

@ -45,7 +45,7 @@ type Kind string
// NewKind returns the kind parsed from the provided string.
func NewKind(s string) Kind {
return Kind(strings.TrimSpace(strings.ToLower(s)))
return Kind(normStr(s))
}
// String provides the kind in human readable form.
@ -323,6 +323,12 @@ func newDiffNotificationEndpoint(ne *notificationEndpoint, i influxdb.Notificati
return diff
}
// IsNew indicates if the resource will be new to the platform or if it edits
// an existing resource.
func (d DiffNotificationEndpoint) IsNew() bool {
return d.Old == nil
}
// DiffTelegraf is a diff of an individual telegraf.
type DiffTelegraf struct {
influxdb.TelegrafConfig
@ -760,15 +766,19 @@ func (l *label) properties() map[string]string {
}
}
func (l *label) toInfluxLabel() influxdb.Label {
return influxdb.Label{
ID: l.ID(),
OrgID: l.OrgID,
Name: l.Name(),
Properties: l.properties(),
}
}
func toInfluxLabels(labels ...*label) []influxdb.Label {
var iLabels []influxdb.Label
for _, l := range labels {
iLabels = append(iLabels, influxdb.Label{
ID: l.ID(),
OrgID: l.OrgID,
Name: l.Name(),
Properties: l.properties(),
})
iLabels = append(iLabels, l.toInfluxLabel())
}
return iLabels
}
@ -802,6 +812,7 @@ const (
)
const (
fieldNotificationEndpointHTTPMethod = "method"
fieldNotificationEndpointPassword = "password"
fieldNotificationEndpointRoutingKey = "routingKey"
fieldNotificationEndpointToken = "token"
@ -815,6 +826,7 @@ type notificationEndpoint struct {
OrgID influxdb.ID
name string
description string
method string
password string
routingKey string
status string
@ -869,36 +881,56 @@ func (n *notificationEndpoint) summarize() SummaryNotificationEndpoint {
sum := SummaryNotificationEndpoint{
LabelAssociations: toInfluxLabels(n.labels...),
}
switch n.kind {
case notificationKindHTTP:
e := &endpoint.HTTP{
Base: base,
URL: n.url,
Method: "POST",
Method: n.method,
}
switch {
case n.password == "" && n.username == "" && n.token == "":
switch n.httpType {
case notificationHTTPAuthTypeNone:
e.AuthMethod = notificationHTTPAuthTypeNone
case n.token != "":
case notificationHTTPAuthTypeBearer:
e.AuthMethod = notificationHTTPAuthTypeBearer
e.Token = influxdb.SecretField{Value: &n.token}
default:
e.AuthMethod = notificationHTTPAuthTypeBasic
e.Password = influxdb.SecretField{Value: &n.password}
e.Username = influxdb.SecretField{Value: &n.username}
}
sum.NotificationEndpoint = e
case notificationKindPagerDuty:
sum.NotificationEndpoint = &endpoint.PagerDuty{
Base: base,
ClientURL: n.url,
Base: base,
ClientURL: n.url,
RoutingKey: influxdb.SecretField{Value: &n.routingKey},
}
case notificationKindSlack:
sum.NotificationEndpoint = &endpoint.Slack{
e := &endpoint.Slack{
Base: base,
URL: n.url,
}
if n.token != "" {
e.Token = influxdb.SecretField{Value: &n.token}
}
sum.NotificationEndpoint = e
}
sum.NotificationEndpoint.BackfillSecretKeys()
return sum
}
var validHTTPMethods = map[string]bool{
"DELETE": true,
"GET": true,
"HEAD": true,
"OPTIONS": true,
"PATCH": true,
"POST": true,
"PUT": true,
}
func (n *notificationEndpoint) valid() []validationErr {
var failures []validationErr
if _, err := url.Parse(n.url); err != nil || n.url == "" {
@ -924,6 +956,13 @@ func (n *notificationEndpoint) valid() []validationErr {
})
}
case notificationKindHTTP:
if !validHTTPMethods[n.method] {
failures = append(failures, validationErr{
Field: fieldNotificationEndpointHTTPMethod,
Msg: "http method must be a valid HTTP verb",
})
}
switch n.httpType {
case notificationHTTPAuthTypeBasic:
if n.password == "" {

View File

@ -549,10 +549,11 @@ func (p *Pkg) graphNotificationEndpoints() *parseErr {
kind: nk.notificationKind,
name: r.Name(),
description: r.stringShort(fieldDescription),
httpType: strings.ToLower(r.stringShort(fieldType)),
method: strings.TrimSpace(strings.ToUpper(r.stringShort(fieldNotificationEndpointHTTPMethod))),
httpType: normStr(r.stringShort(fieldType)),
password: r.stringShort(fieldNotificationEndpointPassword),
routingKey: r.stringShort(fieldNotificationEndpointRoutingKey),
status: strings.ToLower(r.stringShort(fieldStatus)),
status: normStr(r.stringShort(fieldStatus)),
token: r.stringShort(fieldNotificationEndpointToken),
url: r.stringShort(fieldNotificationEndpointURL),
username: r.stringShort(fieldNotificationEndpointUsername),
@ -590,9 +591,9 @@ func (p *Pkg) graphVariables() *parseErr {
newVar := &variable{
name: r.Name(),
Description: r.stringShort(fieldDescription),
Type: strings.ToLower(r.stringShort(fieldType)),
Type: normStr(r.stringShort(fieldType)),
Query: strings.TrimSpace(r.stringShort(fieldQuery)),
Language: strings.ToLower(strings.TrimSpace(r.stringShort(fieldLanguage))),
Language: normStr(r.stringShort(fieldLanguage)),
ConstValues: r.slcStr(fieldValues),
MapValues: r.mapStrStr(fieldValues),
}
@ -1226,3 +1227,7 @@ func IsParseErr(err error) bool {
_, ok := err.(*parseErr)
return ok
}
func normStr(s string) string {
return strings.TrimSpace(strings.ToLower(s))
}

View File

@ -2717,6 +2717,8 @@ spec:
URL: "https://www.example.com/endpoint/basicauth",
AuthMethod: "basic",
Method: "POST",
Username: influxdb.SecretField{Key: "-username", Value: strPtr("secret username")},
Password: influxdb.SecretField{Key: "-password", Value: strPtr("secret password")},
},
},
{
@ -2728,7 +2730,8 @@ spec:
},
URL: "https://www.example.com/endpoint/bearerauth",
AuthMethod: "bearer",
Method: "POST",
Method: "PUT",
Token: influxdb.SecretField{Key: "-token", Value: strPtr("secret token")},
},
},
{
@ -2740,7 +2743,7 @@ spec:
},
URL: "https://www.example.com/endpoint/noneauth",
AuthMethod: "none",
Method: "POST",
Method: "GET",
},
},
{
@ -2750,7 +2753,8 @@ spec:
Description: "pager duty desc",
Status: influxdb.TaskStatusActive,
},
ClientURL: "http://localhost:8080/orgs/7167eb6719fa34e5/alert-history",
ClientURL: "http://localhost:8080/orgs/7167eb6719fa34e5/alert-history",
RoutingKey: influxdb.SecretField{Key: "-routing-key", Value: strPtr("secret routing-key")},
},
},
{
@ -2760,7 +2764,8 @@ spec:
Description: "slack desc",
Status: influxdb.TaskStatusActive,
},
URL: "https://hooks.slack.com/services/bip/piddy/boppidy",
URL: "https://hooks.slack.com/services/bip/piddy/boppidy",
Token: influxdb.SecretField{Key: "-token", Value: strPtr("tokenval")},
},
},
}
@ -2846,6 +2851,7 @@ spec:
resources:
- kind: NotificationEndpointHTTP
name: name1
method: GET
`,
},
},
@ -2866,6 +2872,7 @@ spec:
- kind: NotificationEndpointHTTP
name: name1
type: none
method: POST
url: d_____-_8**(*https://www.examples.coms
`,
},
@ -2873,9 +2880,9 @@ spec:
{
kind: KindNotificationEndpointHTTP,
resErr: testPkgResourceError{
name: "bad url",
name: "missing http method",
validationErrs: 1,
valFields: []string{fieldNotificationEndpointURL},
valFields: []string{fieldNotificationEndpointHTTPMethod},
pkgStr: `apiVersion: 0.1.0
kind: Package
meta:
@ -2887,7 +2894,29 @@ spec:
- kind: NotificationEndpointHTTP
name: name1
type: none
url: d_____-_8**(*https://www.examples.coms
url: http://example.com
`,
},
},
{
kind: KindNotificationEndpointHTTP,
resErr: testPkgResourceError{
name: "invalid http method",
validationErrs: 1,
valFields: []string{fieldNotificationEndpointHTTPMethod},
pkgStr: `apiVersion: 0.1.0
kind: Package
meta:
pkgName: pkg_name
pkgVersion: 1
description: pack description
spec:
resources:
- kind: NotificationEndpointHTTP
name: name1
type: none
method: GUT
url: http://example.com
`,
},
},
@ -2909,6 +2938,7 @@ spec:
name: name1
type: basic
url: example.com
method: POST
password: password
`,
},
@ -2930,6 +2960,7 @@ spec:
- kind: NotificationEndpointHTTP
name: name1
type: basic
method: POST
url: example.com
username: user
`,
@ -2952,6 +2983,7 @@ spec:
- kind: NotificationEndpointHTTP
name: name1
type: basic
method: POST
url: example.com
`,
},
@ -2973,6 +3005,7 @@ spec:
- kind: NotificationEndpointHTTP
name: name1
type: bearer
method: GET
url: example.com
`,
},
@ -2994,27 +3027,7 @@ spec:
- kind: NotificationEndpointHTTP
name: name1
type: threeve
url: example.com
`,
},
},
{
kind: KindNotificationEndpointHTTP,
resErr: testPkgResourceError{
name: "invalid http type",
validationErrs: 1,
valFields: []string{fieldType},
pkgStr: `apiVersion: 0.1.0
kind: Package
meta:
pkgName: pkg_name
pkgVersion: 1
description: pack description
spec:
resources:
- kind: NotificationEndpointHTTP
name: name1
type: threeve
method: GET
url: example.com
`,
},
@ -3609,3 +3622,7 @@ func testfileRunner(t *testing.T, path string, testFn func(t *testing.T, pkg *Pk
t.Run(tt.name, fn)
}
}
func strPtr(s string) *string {
return &s
}

View File

@ -533,11 +533,11 @@ func (s *Service) DryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (Summ
diff := Diff{
Buckets: diffBuckets,
Dashboards: s.dryRunDashboards(ctx, orgID, pkg),
Dashboards: s.dryRunDashboards(pkg),
Labels: diffLabels,
LabelMappings: diffLabelMappings,
NotificationEndpoints: diffEndpoints,
Telegrafs: s.dryRunTelegraf(ctx, orgID, pkg),
Telegrafs: s.dryRunTelegraf(pkg),
Variables: diffVars,
}
return pkg.Summary(), diff, parseErr
@ -571,7 +571,7 @@ func (s *Service) dryRunBuckets(ctx context.Context, orgID influxdb.ID, pkg *Pkg
return diffs, nil
}
func (s *Service) dryRunDashboards(_ context.Context, _ influxdb.ID, pkg *Pkg) []DiffDashboard {
func (s *Service) dryRunDashboards(pkg *Pkg) []DiffDashboard {
var diffs []DiffDashboard
for _, d := range pkg.dashboards() {
diffs = append(diffs, newDiffDashboard(d))
@ -649,7 +649,7 @@ func (s *Service) dryRunNotificationEndpoints(ctx context.Context, orgID influxd
return diffs, nil
}
func (s *Service) dryRunTelegraf(_ context.Context, _ influxdb.ID, pkg *Pkg) []DiffTelegraf {
func (s *Service) dryRunTelegraf(pkg *Pkg) []DiffTelegraf {
var diffs []DiffTelegraf
for _, t := range pkg.telegrafs() {
diffs = append(diffs, newDiffTelegraf(t))
@ -825,30 +825,39 @@ func (s *Service) Apply(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (sum S
// each grouping here runs for its entirety, then returns an error that
// is indicative of running all appliers provided. For instance, the labels
// may have 1 label fail and one of the buckets fails. The errors aggregate so
// the caller will be informed of both the failed label and the failed bucket.
// may have 1 variable fail and one of the buckets fails. The errors aggregate so
// the caller will be informed of both the failed label variable the failed bucket.
// the groupings here allow for steps to occur before exiting. The first step is
// adding the primary resources. Here we get all the errors associated with them.
// adding the dependencies, resources that are associated by other resources. Then the
// primary resources. Here we get all the errors associated with them.
// If those are all good, then we run the secondary(dependent) resources which
// rely on the primary resources having been created.
primary := []applier{
// primary resources
s.applyLabels(pkg.labels()),
s.applyVariables(pkg.variables()),
s.applyBuckets(pkg.buckets()),
s.applyDashboards(pkg.dashboards()),
s.applyTelegrafs(pkg.telegrafs()),
}
if err := coordinator.runTilEnd(ctx, orgID, primary...); err != nil {
return Summary{}, err
appliers := [][]applier{
// want to make all dependencies for belwo donezo before moving on to resources
// that have dependencies on lables
{
// deps for primary resources
s.applyLabels(pkg.labels()),
},
{
// primary resources
s.applyVariables(pkg.variables()),
s.applyBuckets(pkg.buckets()),
s.applyDashboards(pkg.dashboards()),
s.applyNotificationEndpoints(pkg.notificationEndpoints()),
s.applyTelegrafs(pkg.telegrafs()),
},
}
// secondary grouping relies on state being available from the primary run.
// the first example here is label mappings which relies on ids provided
// from the newly created resources in primary.
secondary := []applier{
s.applyLabelMappings(pkg.labelMappings()),
for _, group := range appliers {
if err := coordinator.runTilEnd(ctx, orgID, group...); err != nil {
return Summary{}, err
}
}
// secondary resources
// this last grouping relies on the above 2 steps having completely successfully
secondary := []applier{s.applyLabelMappings(pkg.labelMappings())}
if err := coordinator.runTilEnd(ctx, orgID, secondary...); err != nil {
return Summary{}, err
}
@ -1117,11 +1126,7 @@ func (s *Service) applyLabel(ctx context.Context, l label) (influxdb.Label, erro
return *updatedlabel, nil
}
influxLabel := influxdb.Label{
OrgID: l.OrgID,
Name: l.Name(),
Properties: l.properties(),
}
influxLabel := l.toInfluxLabel()
err := s.labelSVC.CreateLabel(ctx, &influxLabel)
if err != nil {
return influxdb.Label{}, err
@ -1130,6 +1135,94 @@ func (s *Service) applyLabel(ctx context.Context, l label) (influxdb.Label, erro
return influxLabel, nil
}
func (s *Service) applyNotificationEndpoints(endpoints []*notificationEndpoint) applier {
const resource = "notification_endpoints"
mutex := new(doMutex)
rollbackEndpoints := make([]*notificationEndpoint, 0, len(endpoints))
createFn := func(ctx context.Context, i int, orgID influxdb.ID) *applyErrBody {
var endpoint notificationEndpoint
mutex.Do(func() {
endpoints[i].OrgID = orgID
endpoint = *endpoints[i]
})
influxEndpoint, err := s.applyNotificationEndpoint(ctx, endpoint)
if err != nil {
return &applyErrBody{
name: endpoint.Name(),
msg: err.Error(),
}
}
mutex.Do(func() {
endpoints[i].id = influxEndpoint.GetID()
rollbackEndpoints = append(rollbackEndpoints, endpoints[i])
})
return nil
}
return applier{
creater: creater{
entries: len(endpoints),
fn: createFn,
},
rollbacker: rollbacker{
resource: resource,
fn: func() error {
return s.rollbackNotificationEndpoints(rollbackEndpoints)
},
},
}
}
func (s *Service) applyNotificationEndpoint(ctx context.Context, e notificationEndpoint) (influxdb.NotificationEndpoint, error) {
if e.existing != nil {
// stub out userID since we're always using hte http client which will fill it in for us with the token
// feels a bit broken that is required.
// TODO: look into this userID requirement
updatedEndpoint, err := s.endpointSVC.UpdateNotificationEndpoint(ctx, e.ID(), e.existing, 0)
if err != nil {
return nil, err
}
return updatedEndpoint, nil
}
actual := e.summarize().NotificationEndpoint
err := s.endpointSVC.CreateNotificationEndpoint(ctx, actual, 0)
if err != nil {
return nil, err
}
return actual, nil
}
func (s *Service) rollbackNotificationEndpoints(endpoints []*notificationEndpoint) error {
var errs []string
for _, e := range endpoints {
if e.existing == nil {
_, _, err := s.endpointSVC.DeleteNotificationEndpoint(context.Background(), e.ID())
if err != nil {
errs = append(errs, e.ID().String())
}
continue
}
_, err := s.endpointSVC.UpdateNotificationEndpoint(context.Background(), e.ID(), e.existing, 0)
if err != nil {
errs = append(errs, e.ID().String())
}
}
if len(errs) > 0 {
return fmt.Errorf(`notication_endpoint_ids=[%s] err="unable to delete"`, strings.Join(errs, ", "))
}
return nil
}
func (s *Service) applyTelegrafs(teles []*telegraf) applier {
const resource = "telegrafs"

View File

@ -230,7 +230,7 @@ func TestService(t *testing.T) {
Status: influxdb.TaskStatusActive,
},
AuthMethod: "none",
Method: "POST",
Method: "GET",
URL: "https://www.example.com/endpoint/noneauth",
},
},
@ -713,6 +713,77 @@ func TestService(t *testing.T) {
})
t.Run("notification endpoints", func(t *testing.T) {
t.Run("successfully creates pkg of endpoints", func(t *testing.T) {
testfileRunner(t, "testdata/notification_endpoint.yml", func(t *testing.T, pkg *Pkg) {
fakeEndpointSVC := mock.NewNotificationEndpointService()
fakeEndpointSVC.CreateNotificationEndpointF = func(ctx context.Context, nr influxdb.NotificationEndpoint, userID influxdb.ID) error {
nr.SetID(influxdb.ID(fakeEndpointSVC.CreateNotificationEndpointCalls.Count() + 1))
return nil
}
svc := newTestService(WithNoticationEndpointSVC(fakeEndpointSVC))
orgID := influxdb.ID(9000)
sum, err := svc.Apply(context.TODO(), orgID, pkg)
require.NoError(t, err)
require.Len(t, sum.NotificationEndpoints, 5)
containsWithID := func(t *testing.T, name string) {
for _, actual := range sum.NotificationEndpoints {
if actual.GetID() == 0 {
assert.NotZero(t, actual.GetID())
}
if actual.GetName() == name {
return
}
}
assert.Fail(t, "did not find notification by name: "+name)
}
expectedNames := []string{
"http_basic_auth_notification_endpoint",
"http_bearer_auth_notification_endpoint",
"http_none_auth_notification_endpoint",
"pager_duty_notification_endpoint",
"slack_notification_endpoint",
}
for _, expectedName := range expectedNames {
containsWithID(t, expectedName)
}
})
})
t.Run("rolls back all created notifications on an error", func(t *testing.T) {
testfileRunner(t, "testdata/notification_endpoint.yml", func(t *testing.T, pkg *Pkg) {
fakeEndpointSVC := mock.NewNotificationEndpointService()
fakeEndpointSVC.CreateNotificationEndpointF = func(ctx context.Context, nr influxdb.NotificationEndpoint, userID influxdb.ID) error {
nr.SetID(influxdb.ID(fakeEndpointSVC.CreateNotificationEndpointCalls.Count() + 1))
if fakeEndpointSVC.CreateNotificationEndpointCalls.Count() == 5 {
return errors.New("hit that kill count")
}
return nil
}
// create some dupes
for name, endpoint := range pkg.mNotificationEndpoints {
pkg.mNotificationEndpoints["copy"+name] = endpoint
}
svc := newTestService(WithNoticationEndpointSVC(fakeEndpointSVC))
orgID := influxdb.ID(9000)
_, err := svc.Apply(context.TODO(), orgID, pkg)
require.Error(t, err)
assert.GreaterOrEqual(t, fakeEndpointSVC.DeleteNotificationEndpointCalls.Count(), 5)
})
})
})
t.Run("telegrafs", func(t *testing.T) {
t.Run("successfuly creates", func(t *testing.T) {
testfileRunner(t, "testdata/telegraf.yml", func(t *testing.T, pkg *Pkg) {

View File

@ -30,6 +30,7 @@
"kind": "NotificationEndpointHTTP",
"name": "http_none_auth_notification_endpoint",
"description": "http none auth desc",
"method": "GET",
"type": "none",
"url": "https://www.example.com/endpoint/noneauth",
"status": "active",
@ -44,6 +45,7 @@
"kind": "NotificationEndpointHTTP",
"name": "http_basic_auth_notification_endpoint",
"description": "http basic auth desc",
"method": "POST",
"type": "basic",
"url": "https://www.example.com/endpoint/basicauth",
"username": "secret username",
@ -61,6 +63,7 @@
"name": "http_bearer_auth_notification_endpoint",
"description": "http bearer auth desc",
"type": "bearer",
"method": "PUT",
"url": "https://www.example.com/endpoint/bearerauth",
"token": "secret token",
"associations": [

View File

@ -21,6 +21,7 @@ spec:
name: http_none_auth_notification_endpoint
type: none
description: http none auth desc
method: get
url: https://www.example.com/endpoint/noneauth
status: active
associations:
@ -30,6 +31,7 @@ spec:
name: http_basic_auth_notification_endpoint
description: http basic auth desc
type: basic
method: pOsT
url: https://www.example.com/endpoint/basicauth
username: "secret username"
password: "secret password"
@ -41,6 +43,7 @@ spec:
name: http_bearer_auth_notification_endpoint
description: http bearer auth desc
type: bearer
method: puT
url: https://www.example.com/endpoint/bearerauth
token: "secret token"
associations: