Merge pull request #14639 from influxdata/notification_endpoint

feat(influxdb): Add notification endpoint
pull/14684/head
kelwang 2019-08-15 22:57:07 -04:00 committed by GitHub
commit 4e3025efe0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 6377 additions and 59 deletions

View File

@ -0,0 +1,139 @@
package authorizer
import (
"context"
"github.com/influxdata/influxdb"
)
var _ influxdb.NotificationEndpointService = (*NotificationEndpointService)(nil)
// NotificationEndpointService wraps a influxdb.NotificationEndpointService and authorizes actions
// against it appropriately.
type NotificationEndpointService struct {
s influxdb.NotificationEndpointService
influxdb.UserResourceMappingService
influxdb.OrganizationService
influxdb.SecretService
}
// NewNotificationEndpointService constructs an instance of an authorizing notification endpoint serivce.
func NewNotificationEndpointService(
s influxdb.NotificationEndpointService,
urm influxdb.UserResourceMappingService,
org influxdb.OrganizationService,
srt influxdb.SecretService,
) *NotificationEndpointService {
return &NotificationEndpointService{
s: s,
UserResourceMappingService: urm,
OrganizationService: org,
SecretService: srt,
}
}
// FindNotificationEndpointByID checks to see if the authorizer on context has read access to the id provided.
func (s *NotificationEndpointService) FindNotificationEndpointByID(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
edp, err := s.s.FindNotificationEndpointByID(ctx, id)
if err != nil {
return nil, err
}
if err := authorizeReadOrg(ctx, edp.GetOrgID()); err != nil {
return nil, err
}
return edp, nil
}
// FindNotificationEndpoints retrieves all notification endpoints that match the provided filter and then filters the list down to only the resources that are authorized.
func (s *NotificationEndpointService) FindNotificationEndpoints(ctx context.Context, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error) {
// TODO: we'll likely want to push this operation into the database eventually since fetching the whole list of data
// will likely be expensive.
edps, _, err := s.s.FindNotificationEndpoints(ctx, filter, opt...)
if err != nil {
return nil, 0, err
}
// This filters without allocating
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
endpoints := edps[:0]
for _, edp := range edps {
err := authorizeReadOrg(ctx, edp.GetOrgID())
if err != nil && influxdb.ErrorCode(err) != influxdb.EUnauthorized {
return nil, 0, err
}
if influxdb.ErrorCode(err) == influxdb.EUnauthorized {
continue
}
endpoints = append(endpoints, edp)
}
return endpoints, len(endpoints), nil
}
// CreateNotificationEndpoint checks to see if the authorizer on context has write access to the global notification endpoint resource.
func (s *NotificationEndpointService) CreateNotificationEndpoint(ctx context.Context, edp influxdb.NotificationEndpoint, userID influxdb.ID) error {
p, err := influxdb.NewPermission(influxdb.WriteAction, influxdb.NotificationEndpointResourceType, edp.GetOrgID())
if err != nil {
return err
}
pOrg, err := newOrgPermission(influxdb.WriteAction, edp.GetOrgID())
if err != nil {
return err
}
err0 := IsAllowed(ctx, *p)
err1 := IsAllowed(ctx, *pOrg)
if err0 != nil && err1 != nil {
return err0
}
return s.s.CreateNotificationEndpoint(ctx, edp, userID)
}
// UpdateNotificationEndpoint checks to see if the authorizer on context has write access to the notification endpoint provided.
func (s *NotificationEndpointService) UpdateNotificationEndpoint(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
edp, err := s.FindNotificationEndpointByID(ctx, id)
if err != nil {
return nil, err
}
if err := authorizeWriteOrg(ctx, edp.GetOrgID()); err != nil {
return nil, err
}
return s.s.UpdateNotificationEndpoint(ctx, id, upd, userID)
}
// PatchNotificationEndpoint checks to see if the authorizer on context has write access to the notification endpoint provided.
func (s *NotificationEndpointService) PatchNotificationEndpoint(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error) {
edp, err := s.FindNotificationEndpointByID(ctx, id)
if err != nil {
return nil, err
}
if err := authorizeWriteOrg(ctx, edp.GetOrgID()); err != nil {
return nil, err
}
return s.s.PatchNotificationEndpoint(ctx, id, upd)
}
// DeleteNotificationEndpoint checks to see if the authorizer on context has write access to the notification endpoint provided.
func (s *NotificationEndpointService) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) error {
edp, err := s.FindNotificationEndpointByID(ctx, id)
if err != nil {
return err
}
if err := authorizeWriteOrg(ctx, edp.GetOrgID()); err != nil {
return err
}
return s.s.DeleteNotificationEndpoint(ctx, id)
}

View File

@ -0,0 +1,737 @@
package authorizer_test
import (
"bytes"
"context"
"sort"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/authorizer"
influxdbcontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/notification/endpoint"
influxdbtesting "github.com/influxdata/influxdb/testing"
)
var notificationEndpointCmpOptions = cmp.Options{
cmp.Comparer(func(x, y []byte) bool {
return bytes.Equal(x, y)
}),
cmp.Transformer("Sort", func(in []influxdb.NotificationEndpoint) []influxdb.NotificationEndpoint {
out := append([]influxdb.NotificationEndpoint(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].GetID().String() > out[j].GetID().String()
})
return out
}),
}
func TestNotificationEndpointService_FindNotificationEndpointByID(t *testing.T) {
type fields struct {
NotificationEndpointService influxdb.NotificationEndpointService
}
type args struct {
permission influxdb.Permission
id influxdb.ID
}
type wants struct {
err error
}
tests := []struct {
name string
fields fields
args args
wants wants
}{
{
name: "authorized to access id with org",
fields: fields{
NotificationEndpointService: &mock.NotificationEndpointService{
FindNotificationEndpointByIDF: func(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
return &endpoint.Slack{
Base: endpoint.Base{
ID: id,
OrgID: 10,
},
}, nil
},
},
},
args: args{
permission: influxdb.Permission{
Action: "read",
Resource: influxdb.Resource{
Type: influxdb.OrgsResourceType,
ID: influxdbtesting.IDPtr(10),
},
},
id: 1,
},
wants: wants{
err: nil,
},
},
{
name: "unauthorized to access id",
fields: fields{
NotificationEndpointService: &mock.NotificationEndpointService{
FindNotificationEndpointByIDF: func(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
return &endpoint.Slack{
Base: endpoint.Base{
ID: id,
OrgID: 10,
},
}, nil
},
},
},
args: args{
permission: influxdb.Permission{
Action: "read",
Resource: influxdb.Resource{
Type: influxdb.NotificationEndpointResourceType,
ID: influxdbtesting.IDPtr(2),
},
},
id: 1,
},
wants: wants{
err: &influxdb.Error{
Msg: "read:orgs/000000000000000a is unauthorized",
Code: influxdb.EUnauthorized,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := authorizer.NewNotificationEndpointService(tt.fields.NotificationEndpointService, mock.NewUserResourceMappingService(), mock.NewOrganizationService(), mock.NewSecretService())
ctx := context.Background()
ctx = influxdbcontext.SetAuthorizer(ctx, &Authorizer{[]influxdb.Permission{tt.args.permission}})
_, err := s.FindNotificationEndpointByID(ctx, tt.args.id)
influxdbtesting.ErrorsEqual(t, err, tt.wants.err)
})
}
}
func TestNotificationEndpointService_FindNotificationEndpoints(t *testing.T) {
type fields struct {
NotificationEndpointService influxdb.NotificationEndpointService
}
type args struct {
permission influxdb.Permission
}
type wants struct {
err error
notificationEndpoints []influxdb.NotificationEndpoint
}
tests := []struct {
name string
fields fields
args args
wants wants
}{
{
name: "authorized to see all notificationEndpoints",
fields: fields{
NotificationEndpointService: &mock.NotificationEndpointService{
FindNotificationEndpointsF: func(ctx context.Context, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error) {
return []influxdb.NotificationEndpoint{
&endpoint.Slack{
Base: endpoint.Base{
ID: 1,
OrgID: 10,
},
},
&endpoint.Slack{
Base: endpoint.Base{
ID: 2,
OrgID: 10,
},
},
&endpoint.WebHook{
Base: endpoint.Base{
ID: 3,
OrgID: 11,
},
},
}, 3, nil
},
},
},
args: args{
permission: influxdb.Permission{
Action: "read",
Resource: influxdb.Resource{
Type: influxdb.OrgsResourceType,
},
},
},
wants: wants{
notificationEndpoints: []influxdb.NotificationEndpoint{
&endpoint.Slack{
Base: endpoint.Base{
ID: 1,
OrgID: 10,
},
},
&endpoint.Slack{
Base: endpoint.Base{
ID: 2,
OrgID: 10,
},
},
&endpoint.WebHook{
Base: endpoint.Base{
ID: 3,
OrgID: 11,
},
},
},
},
},
{
name: "authorized to access a single orgs notificationEndpoints",
fields: fields{
NotificationEndpointService: &mock.NotificationEndpointService{
FindNotificationEndpointsF: func(ctx context.Context, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error) {
return []influxdb.NotificationEndpoint{
&endpoint.Slack{
Base: endpoint.Base{
ID: 1,
OrgID: 10,
},
},
&endpoint.Slack{
Base: endpoint.Base{
ID: 2,
OrgID: 10,
},
},
&endpoint.WebHook{
Base: endpoint.Base{
ID: 3,
OrgID: 11,
},
},
}, 3, nil
},
},
},
args: args{
permission: influxdb.Permission{
Action: "read",
Resource: influxdb.Resource{
Type: influxdb.OrgsResourceType,
ID: influxdbtesting.IDPtr(10),
},
},
},
wants: wants{
notificationEndpoints: []influxdb.NotificationEndpoint{
&endpoint.Slack{
Base: endpoint.Base{
ID: 1,
OrgID: 10,
},
},
&endpoint.Slack{
Base: endpoint.Base{
ID: 2,
OrgID: 10,
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := authorizer.NewNotificationEndpointService(tt.fields.NotificationEndpointService,
mock.NewUserResourceMappingService(),
mock.NewOrganizationService(), mock.NewSecretService())
ctx := context.Background()
ctx = influxdbcontext.SetAuthorizer(ctx, &Authorizer{[]influxdb.Permission{tt.args.permission}})
edps, _, err := s.FindNotificationEndpoints(ctx, influxdb.NotificationEndpointFilter{})
influxdbtesting.ErrorsEqual(t, err, tt.wants.err)
if diff := cmp.Diff(edps, tt.wants.notificationEndpoints, notificationEndpointCmpOptions...); diff != "" {
t.Errorf("notificationEndpoints are different -got/+want\ndiff %s", diff)
}
})
}
}
func TestNotificationEndpointService_UpdateNotificationEndpoint(t *testing.T) {
type fields struct {
NotificationEndpointService influxdb.NotificationEndpointService
}
type args struct {
id influxdb.ID
permissions []influxdb.Permission
}
type wants struct {
err error
}
tests := []struct {
name string
fields fields
args args
wants wants
}{
{
name: "authorized to update notificationEndpoint with org owner",
fields: fields{
NotificationEndpointService: &mock.NotificationEndpointService{
FindNotificationEndpointByIDF: func(ctc context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
return &endpoint.Slack{
Base: endpoint.Base{
ID: 1,
OrgID: 10,
},
}, nil
},
UpdateNotificationEndpointF: func(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
return &endpoint.Slack{
Base: endpoint.Base{
ID: 1,
OrgID: 10,
},
}, nil
},
},
},
args: args{
id: 1,
permissions: []influxdb.Permission{
{
Action: "write",
Resource: influxdb.Resource{
Type: influxdb.OrgsResourceType,
ID: influxdbtesting.IDPtr(10),
},
},
{
Action: "read",
Resource: influxdb.Resource{
Type: influxdb.OrgsResourceType,
ID: influxdbtesting.IDPtr(10),
},
},
},
},
wants: wants{
err: nil,
},
},
{
name: "unauthorized to update notificationEndpoint",
fields: fields{
NotificationEndpointService: &mock.NotificationEndpointService{
FindNotificationEndpointByIDF: func(ctc context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
return &endpoint.Slack{
Base: endpoint.Base{
ID: 1,
OrgID: 10,
},
}, nil
},
UpdateNotificationEndpointF: func(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
return &endpoint.Slack{
Base: endpoint.Base{
ID: 1,
OrgID: 10,
},
}, nil
},
},
},
args: args{
id: 1,
permissions: []influxdb.Permission{
{
Action: "read",
Resource: influxdb.Resource{
Type: influxdb.OrgsResourceType,
ID: influxdbtesting.IDPtr(10),
},
},
},
},
wants: wants{
err: &influxdb.Error{
Msg: "write:orgs/000000000000000a is unauthorized",
Code: influxdb.EUnauthorized,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := authorizer.NewNotificationEndpointService(tt.fields.NotificationEndpointService,
mock.NewUserResourceMappingService(),
mock.NewOrganizationService(), mock.NewSecretService())
ctx := context.Background()
ctx = influxdbcontext.SetAuthorizer(ctx, &Authorizer{tt.args.permissions})
_, err := s.UpdateNotificationEndpoint(ctx, tt.args.id, &endpoint.Slack{}, influxdb.ID(1))
influxdbtesting.ErrorsEqual(t, err, tt.wants.err)
})
}
}
func TestNotificationEndpointService_PatchNotificationEndpoint(t *testing.T) {
type fields struct {
NotificationEndpointService influxdb.NotificationEndpointService
}
type args struct {
id influxdb.ID
permissions []influxdb.Permission
}
type wants struct {
err error
}
tests := []struct {
name string
fields fields
args args
wants wants
}{
{
name: "authorized to patch notificationEndpoint",
fields: fields{
NotificationEndpointService: &mock.NotificationEndpointService{
FindNotificationEndpointByIDF: func(ctc context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
return &endpoint.Slack{
Base: endpoint.Base{
ID: 1,
OrgID: 10,
},
}, nil
},
PatchNotificationEndpointF: func(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error) {
return &endpoint.Slack{
Base: endpoint.Base{
ID: 1,
OrgID: 10,
},
}, nil
},
},
},
args: args{
id: 1,
permissions: []influxdb.Permission{
{
Action: "write",
Resource: influxdb.Resource{
Type: influxdb.OrgsResourceType,
ID: influxdbtesting.IDPtr(10),
},
},
{
Action: "read",
Resource: influxdb.Resource{
Type: influxdb.OrgsResourceType,
ID: influxdbtesting.IDPtr(10),
},
},
},
},
wants: wants{
err: nil,
},
},
{
name: "unauthorized to patch notificationEndpoint",
fields: fields{
NotificationEndpointService: &mock.NotificationEndpointService{
FindNotificationEndpointByIDF: func(ctc context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
return &endpoint.Slack{
Base: endpoint.Base{
ID: 1,
OrgID: 10,
},
}, nil
},
PatchNotificationEndpointF: func(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error) {
return &endpoint.Slack{
Base: endpoint.Base{
ID: 1,
OrgID: 10,
},
}, nil
},
},
},
args: args{
id: 1,
permissions: []influxdb.Permission{
{
Action: "read",
Resource: influxdb.Resource{
Type: influxdb.OrgsResourceType,
ID: influxdbtesting.IDPtr(10),
},
},
},
},
wants: wants{
err: &influxdb.Error{
Msg: "write:orgs/000000000000000a is unauthorized",
Code: influxdb.EUnauthorized,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := authorizer.NewNotificationEndpointService(tt.fields.NotificationEndpointService, mock.NewUserResourceMappingService(),
mock.NewOrganizationService(), mock.NewSecretService())
ctx := context.Background()
ctx = influxdbcontext.SetAuthorizer(ctx, &Authorizer{tt.args.permissions})
_, err := s.PatchNotificationEndpoint(ctx, tt.args.id, influxdb.NotificationEndpointUpdate{})
influxdbtesting.ErrorsEqual(t, err, tt.wants.err)
})
}
}
func TestNotificationEndpointService_DeleteNotificationEndpoint(t *testing.T) {
type fields struct {
NotificationEndpointService influxdb.NotificationEndpointService
}
type args struct {
id influxdb.ID
permissions []influxdb.Permission
}
type wants struct {
err error
}
tests := []struct {
name string
fields fields
args args
wants wants
}{
{
name: "authorized to delete notificationEndpoint",
fields: fields{
NotificationEndpointService: &mock.NotificationEndpointService{
FindNotificationEndpointByIDF: func(ctc context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
return &endpoint.Slack{
Base: endpoint.Base{
ID: 1,
OrgID: 10,
},
}, nil
},
DeleteNotificationEndpointF: func(ctx context.Context, id influxdb.ID) error {
return nil
},
},
},
args: args{
id: 1,
permissions: []influxdb.Permission{
{
Action: "write",
Resource: influxdb.Resource{
Type: influxdb.OrgsResourceType,
ID: influxdbtesting.IDPtr(10),
},
},
{
Action: "read",
Resource: influxdb.Resource{
Type: influxdb.OrgsResourceType,
ID: influxdbtesting.IDPtr(10),
},
},
},
},
wants: wants{
err: nil,
},
},
{
name: "unauthorized to delete notificationEndpoint",
fields: fields{
NotificationEndpointService: &mock.NotificationEndpointService{
FindNotificationEndpointByIDF: func(ctc context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
return &endpoint.Slack{
Base: endpoint.Base{
ID: 1,
OrgID: 10,
},
}, nil
},
DeleteNotificationEndpointF: func(ctx context.Context, id influxdb.ID) error {
return nil
},
},
},
args: args{
id: 1,
permissions: []influxdb.Permission{
{
Action: "read",
Resource: influxdb.Resource{
Type: influxdb.OrgsResourceType,
ID: influxdbtesting.IDPtr(10),
},
},
},
},
wants: wants{
err: &influxdb.Error{
Msg: "write:orgs/000000000000000a is unauthorized",
Code: influxdb.EUnauthorized,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := authorizer.NewNotificationEndpointService(tt.fields.NotificationEndpointService, mock.NewUserResourceMappingService(),
mock.NewOrganizationService(),
mock.NewSecretService(),
)
ctx := context.Background()
ctx = influxdbcontext.SetAuthorizer(ctx, &Authorizer{tt.args.permissions})
err := s.DeleteNotificationEndpoint(ctx, tt.args.id)
influxdbtesting.ErrorsEqual(t, err, tt.wants.err)
})
}
}
func TestNotificationEndpointService_CreateNotificationEndpoint(t *testing.T) {
type fields struct {
NotificationEndpointService influxdb.NotificationEndpointService
}
type args struct {
permission influxdb.Permission
orgID influxdb.ID
}
type wants struct {
err error
}
tests := []struct {
name string
fields fields
args args
wants wants
}{
{
name: "authorized to create notificationEndpoint",
fields: fields{
NotificationEndpointService: &mock.NotificationEndpointService{
CreateNotificationEndpointF: func(ctx context.Context, tc influxdb.NotificationEndpoint, userID influxdb.ID) error {
return nil
},
},
},
args: args{
orgID: 10,
permission: influxdb.Permission{
Action: "write",
Resource: influxdb.Resource{
Type: influxdb.NotificationEndpointResourceType,
OrgID: influxdbtesting.IDPtr(10),
},
},
},
wants: wants{
err: nil,
},
},
{
name: "authorized to create notificationEndpoint with org owner",
fields: fields{
NotificationEndpointService: &mock.NotificationEndpointService{
CreateNotificationEndpointF: func(ctx context.Context, tc influxdb.NotificationEndpoint, userID influxdb.ID) error {
return nil
},
},
},
args: args{
orgID: 10,
permission: influxdb.Permission{
Action: "write",
Resource: influxdb.Resource{
Type: influxdb.OrgsResourceType,
ID: influxdbtesting.IDPtr(10),
},
},
},
wants: wants{
err: nil,
},
},
{
name: "unauthorized to create notificationEndpoint",
fields: fields{
NotificationEndpointService: &mock.NotificationEndpointService{
CreateNotificationEndpointF: func(ctx context.Context, tc influxdb.NotificationEndpoint, userID influxdb.ID) error {
return nil
},
},
},
args: args{
orgID: 10,
permission: influxdb.Permission{
Action: "write",
Resource: influxdb.Resource{
Type: influxdb.NotificationEndpointResourceType,
ID: influxdbtesting.IDPtr(1),
},
},
},
wants: wants{
err: &influxdb.Error{
Msg: "write:orgs/000000000000000a/notificationEndpoints is unauthorized",
Code: influxdb.EUnauthorized,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := authorizer.NewNotificationEndpointService(tt.fields.NotificationEndpointService,
mock.NewUserResourceMappingService(),
mock.NewOrganizationService(),
mock.NewSecretService())
ctx := context.Background()
ctx = influxdbcontext.SetAuthorizer(ctx, &Authorizer{[]influxdb.Permission{tt.args.permission}})
err := s.CreateNotificationEndpoint(ctx, &endpoint.Slack{
Base: endpoint.Base{
OrgID: tt.args.orgID},
}, influxdb.ID(1))
influxdbtesting.ErrorsEqual(t, err, tt.wants.err)
})
}
}

View File

@ -21,7 +21,7 @@ type Check interface {
GenerateFlux() (string, error) GenerateFlux() (string, error)
GetAuthID() ID GetAuthID() ID
json.Marshaler json.Marshaler
Updator Updater
Getter Getter
} }

View File

@ -45,8 +45,14 @@ type AuthorizationCreateFlags struct {
writeDashboardsPermission bool writeDashboardsPermission bool
readDashboardsPermission bool readDashboardsPermission bool
writeCheckPermission bool
readCheckPermission bool
writeNotificationRulePermission bool writeNotificationRulePermission bool
readNotificationRulePermission bool readNotificationRulePermission bool
writeNotificationEndpointPermission bool
readNotificationEndpointPermission bool
} }
var authorizationCreateFlags AuthorizationCreateFlags var authorizationCreateFlags AuthorizationCreateFlags
@ -87,6 +93,12 @@ func init() {
authorizationCreateCmd.Flags().BoolVarP(&authorizationCreateFlags.writeNotificationRulePermission, "write-notificationRules", "", false, "Grants the permission to create notificationRules") authorizationCreateCmd.Flags().BoolVarP(&authorizationCreateFlags.writeNotificationRulePermission, "write-notificationRules", "", false, "Grants the permission to create notificationRules")
authorizationCreateCmd.Flags().BoolVarP(&authorizationCreateFlags.readNotificationRulePermission, "read-notificationRules", "", false, "Grants the permission to read notificationRules") authorizationCreateCmd.Flags().BoolVarP(&authorizationCreateFlags.readNotificationRulePermission, "read-notificationRules", "", false, "Grants the permission to read notificationRules")
authorizationCreateCmd.Flags().BoolVarP(&authorizationCreateFlags.writeNotificationEndpointPermission, "write-notificationEndpoints", "", false, "Grants the permission to create notificationEndpoints")
authorizationCreateCmd.Flags().BoolVarP(&authorizationCreateFlags.readNotificationEndpointPermission, "read-notificationEndpoints", "", false, "Grants the permission to read notificationEndpoints")
authorizationCreateCmd.Flags().BoolVarP(&authorizationCreateFlags.writeCheckPermission, "write-checks", "", false, "Grants the permission to create checks")
authorizationCreateCmd.Flags().BoolVarP(&authorizationCreateFlags.readCheckPermission, "read-checks", "", false, "Grants the permission to read checks")
authorizationCmd.AddCommand(authorizationCreateCmd) authorizationCmd.AddCommand(authorizationCreateCmd)
} }
@ -243,6 +255,38 @@ func authorizationCreateF(cmd *cobra.Command, args []string) error {
permissions = append(permissions, *p) permissions = append(permissions, *p)
} }
if authorizationCreateFlags.writeNotificationEndpointPermission {
p, err := platform.NewPermission(platform.WriteAction, platform.NotificationEndpointResourceType, o.ID)
if err != nil {
return err
}
permissions = append(permissions, *p)
}
if authorizationCreateFlags.readNotificationEndpointPermission {
p, err := platform.NewPermission(platform.ReadAction, platform.NotificationEndpointResourceType, o.ID)
if err != nil {
return err
}
permissions = append(permissions, *p)
}
if authorizationCreateFlags.writeCheckPermission {
p, err := platform.NewPermission(platform.WriteAction, platform.ChecksResourceType, o.ID)
if err != nil {
return err
}
permissions = append(permissions, *p)
}
if authorizationCreateFlags.readCheckPermission {
p, err := platform.NewPermission(platform.ReadAction, platform.ChecksResourceType, o.ID)
if err != nil {
return err
}
permissions = append(permissions, *p)
}
authorization := &platform.Authorization{ authorization := &platform.Authorization{
Permissions: permissions, Permissions: permissions,
OrgID: o.ID, OrgID: o.ID,

View File

@ -435,28 +435,29 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.reg.MustRegister(m.boltClient) m.reg.MustRegister(m.boltClient)
var ( var (
orgSvc platform.OrganizationService = m.kvService orgSvc platform.OrganizationService = m.kvService
authSvc platform.AuthorizationService = m.kvService authSvc platform.AuthorizationService = m.kvService
userSvc platform.UserService = m.kvService userSvc platform.UserService = m.kvService
variableSvc platform.VariableService = m.kvService variableSvc platform.VariableService = m.kvService
bucketSvc platform.BucketService = m.kvService bucketSvc platform.BucketService = m.kvService
sourceSvc platform.SourceService = m.kvService sourceSvc platform.SourceService = m.kvService
sessionSvc platform.SessionService = m.kvService sessionSvc platform.SessionService = m.kvService
passwdsSvc platform.PasswordsService = m.kvService passwdsSvc platform.PasswordsService = m.kvService
dashboardSvc platform.DashboardService = m.kvService dashboardSvc platform.DashboardService = m.kvService
dashboardLogSvc platform.DashboardOperationLogService = m.kvService dashboardLogSvc platform.DashboardOperationLogService = m.kvService
userLogSvc platform.UserOperationLogService = m.kvService userLogSvc platform.UserOperationLogService = m.kvService
bucketLogSvc platform.BucketOperationLogService = m.kvService bucketLogSvc platform.BucketOperationLogService = m.kvService
orgLogSvc platform.OrganizationOperationLogService = m.kvService orgLogSvc platform.OrganizationOperationLogService = m.kvService
onboardingSvc platform.OnboardingService = m.kvService onboardingSvc platform.OnboardingService = m.kvService
scraperTargetSvc platform.ScraperTargetStoreService = m.kvService scraperTargetSvc platform.ScraperTargetStoreService = m.kvService
telegrafSvc platform.TelegrafConfigStore = m.kvService telegrafSvc platform.TelegrafConfigStore = m.kvService
userResourceSvc platform.UserResourceMappingService = m.kvService userResourceSvc platform.UserResourceMappingService = m.kvService
labelSvc platform.LabelService = m.kvService labelSvc platform.LabelService = m.kvService
secretSvc platform.SecretService = m.kvService secretSvc platform.SecretService = m.kvService
lookupSvc platform.LookupService = m.kvService lookupSvc platform.LookupService = m.kvService
notificationRuleSvc platform.NotificationRuleStore = m.kvService notificationRuleSvc platform.NotificationRuleStore = m.kvService
checkSvc platform.CheckService = m.kvService notificationEndpointSvc platform.NotificationEndpointService = m.kvService
checkSvc platform.CheckService = m.kvService
) )
switch m.secretStore { switch m.secretStore {
@ -628,6 +629,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
TaskService: taskSvc, TaskService: taskSvc,
TelegrafService: telegrafSvc, TelegrafService: telegrafSvc,
NotificationRuleStore: notificationRuleSvc, NotificationRuleStore: notificationRuleSvc,
NotificationEndpointService: notificationEndpointSvc,
CheckService: checkSvc, CheckService: checkSvc,
ScraperTargetStoreService: scraperTargetSvc, ScraperTargetStoreService: scraperTargetSvc,
ChronografService: chronografSvc, ChronografService: chronografSvc,

View File

@ -18,27 +18,28 @@ import (
// APIHandler is a collection of all the service handlers. // APIHandler is a collection of all the service handlers.
type APIHandler struct { type APIHandler struct {
influxdb.HTTPErrorHandler influxdb.HTTPErrorHandler
BucketHandler *BucketHandler BucketHandler *BucketHandler
UserHandler *UserHandler UserHandler *UserHandler
OrgHandler *OrgHandler OrgHandler *OrgHandler
AuthorizationHandler *AuthorizationHandler AuthorizationHandler *AuthorizationHandler
DashboardHandler *DashboardHandler DashboardHandler *DashboardHandler
LabelHandler *LabelHandler LabelHandler *LabelHandler
AssetHandler *AssetHandler AssetHandler *AssetHandler
ChronografHandler *ChronografHandler ChronografHandler *ChronografHandler
ScraperHandler *ScraperHandler ScraperHandler *ScraperHandler
SourceHandler *SourceHandler SourceHandler *SourceHandler
VariableHandler *VariableHandler VariableHandler *VariableHandler
TaskHandler *TaskHandler TaskHandler *TaskHandler
CheckHandler *CheckHandler CheckHandler *CheckHandler
TelegrafHandler *TelegrafHandler TelegrafHandler *TelegrafHandler
QueryHandler *FluxHandler QueryHandler *FluxHandler
WriteHandler *WriteHandler WriteHandler *WriteHandler
DocumentHandler *DocumentHandler DocumentHandler *DocumentHandler
SetupHandler *SetupHandler SetupHandler *SetupHandler
SessionHandler *SessionHandler SessionHandler *SessionHandler
SwaggerHandler http.Handler SwaggerHandler http.Handler
NotificationRuleHandler *NotificationRuleHandler NotificationRuleHandler *NotificationRuleHandler
NotificationEndpointHandler *NotificationEndpointHandler
} }
// APIBackend is all services and associated parameters required to construct // APIBackend is all services and associated parameters required to construct
@ -84,6 +85,7 @@ type APIBackend struct {
OrgLookupService authorizer.OrganizationService OrgLookupService authorizer.OrganizationService
DocumentService influxdb.DocumentService DocumentService influxdb.DocumentService
NotificationRuleStore influxdb.NotificationRuleStore NotificationRuleStore influxdb.NotificationRuleStore
NotificationEndpointService influxdb.NotificationEndpointService
} }
// PrometheusCollectors exposes the prometheus collectors associated with an APIBackend. // PrometheusCollectors exposes the prometheus collectors associated with an APIBackend.
@ -167,6 +169,11 @@ func NewAPIHandler(b *APIBackend) *APIHandler {
b.UserResourceMappingService, b.OrganizationService) b.UserResourceMappingService, b.OrganizationService)
h.NotificationRuleHandler = NewNotificationRuleHandler(notificationRuleBackend) h.NotificationRuleHandler = NewNotificationRuleHandler(notificationRuleBackend)
notificationEndpointBackend := NewNotificationEndpointBackend(b)
notificationEndpointBackend.NotificationEndpointService = authorizer.NewNotificationEndpointService(b.NotificationEndpointService,
b.UserResourceMappingService, b.OrganizationService, b.SecretService)
h.NotificationEndpointHandler = NewNotificationEndpointHandler(notificationEndpointBackend)
checkBackend := NewCheckBackend(b) checkBackend := NewCheckBackend(b)
checkBackend.CheckService = authorizer.NewCheckService(b.CheckService, checkBackend.CheckService = authorizer.NewCheckService(b.CheckService,
b.UserResourceMappingService, b.OrganizationService) b.UserResourceMappingService, b.OrganizationService)
@ -194,11 +201,12 @@ var apiLinks = map[string]interface{}{
"external": map[string]string{ "external": map[string]string{
"statusFeed": "https://www.influxdata.com/feed/json", "statusFeed": "https://www.influxdata.com/feed/json",
}, },
"labels": "/api/v2/labels", "labels": "/api/v2/labels",
"variables": "/api/v2/variables", "variables": "/api/v2/variables",
"me": "/api/v2/me", "me": "/api/v2/me",
"notificationRules": "/api/v2/notificationRules", "notificationRules": "/api/v2/notificationRules",
"orgs": "/api/v2/orgs", "notificationEndpoints": "/api/v2/notificationEndpoints",
"orgs": "/api/v2/orgs",
"query": map[string]string{ "query": map[string]string{
"self": "/api/v2/query", "self": "/api/v2/query",
"ast": "/api/v2/query/ast", "ast": "/api/v2/query/ast",
@ -328,6 +336,11 @@ func (h *APIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
if strings.HasPrefix(r.URL.Path, "/api/v2/notificationEndpoints") {
h.NotificationEndpointHandler.ServeHTTP(w, r)
return
}
if strings.HasPrefix(r.URL.Path, "/api/v2/variables") { if strings.HasPrefix(r.URL.Path, "/api/v2/variables") {
h.VariableHandler.ServeHTTP(w, r) h.VariableHandler.ServeHTTP(w, r)
return return

View File

@ -0,0 +1,487 @@
package http
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/influxdata/influxdb"
pctx "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/notification/endpoint"
"github.com/julienschmidt/httprouter"
"go.uber.org/zap"
)
// NotificationEndpointBackend is all services and associated parameters required to construct
// the NotificationEndpointBackendHandler.
type NotificationEndpointBackend struct {
influxdb.HTTPErrorHandler
Logger *zap.Logger
NotificationEndpointService influxdb.NotificationEndpointService
UserResourceMappingService influxdb.UserResourceMappingService
LabelService influxdb.LabelService
UserService influxdb.UserService
OrganizationService influxdb.OrganizationService
}
// NewNotificationEndpointBackend returns a new instance of NotificationEndpointBackend.
func NewNotificationEndpointBackend(b *APIBackend) *NotificationEndpointBackend {
return &NotificationEndpointBackend{
HTTPErrorHandler: b.HTTPErrorHandler,
Logger: b.Logger.With(zap.String("handler", "notificationEndpoint")),
NotificationEndpointService: b.NotificationEndpointService,
UserResourceMappingService: b.UserResourceMappingService,
LabelService: b.LabelService,
UserService: b.UserService,
OrganizationService: b.OrganizationService,
}
}
// NotificationEndpointHandler is the handler for the notificationEndpoint service
type NotificationEndpointHandler struct {
*httprouter.Router
influxdb.HTTPErrorHandler
Logger *zap.Logger
NotificationEndpointService influxdb.NotificationEndpointService
UserResourceMappingService influxdb.UserResourceMappingService
LabelService influxdb.LabelService
UserService influxdb.UserService
OrganizationService influxdb.OrganizationService
}
const (
notificationEndpointsPath = "/api/v2/notificationEndpoints"
notificationEndpointsIDPath = "/api/v2/notificationEndpoints/:id"
notificationEndpointsIDMembersPath = "/api/v2/notificationEndpoints/:id/members"
notificationEndpointsIDMembersIDPath = "/api/v2/notificationEndpoints/:id/members/:userID"
notificationEndpointsIDOwnersPath = "/api/v2/notificationEndpoints/:id/owners"
notificationEndpointsIDOwnersIDPath = "/api/v2/notificationEndpoints/:id/owners/:userID"
notificationEndpointsIDLabelsPath = "/api/v2/notificationEndpoints/:id/labels"
notificationEndpointsIDLabelsIDPath = "/api/v2/notificationEndpoints/:id/labels/:lid"
)
// NewNotificationEndpointHandler returns a new instance of NotificationEndpointHandler.
func NewNotificationEndpointHandler(b *NotificationEndpointBackend) *NotificationEndpointHandler {
h := &NotificationEndpointHandler{
Router: NewRouter(b.HTTPErrorHandler),
HTTPErrorHandler: b.HTTPErrorHandler,
Logger: b.Logger,
NotificationEndpointService: b.NotificationEndpointService,
UserResourceMappingService: b.UserResourceMappingService,
LabelService: b.LabelService,
UserService: b.UserService,
OrganizationService: b.OrganizationService,
}
h.HandlerFunc("POST", notificationEndpointsPath, h.handlePostNotificationEndpoint)
h.HandlerFunc("GET", notificationEndpointsPath, h.handleGetNotificationEndpoints)
h.HandlerFunc("GET", notificationEndpointsIDPath, h.handleGetNotificationEndpoint)
h.HandlerFunc("DELETE", notificationEndpointsIDPath, h.handleDeleteNotificationEndpoint)
h.HandlerFunc("PUT", notificationEndpointsIDPath, h.handlePutNotificationEndpoint)
h.HandlerFunc("PATCH", notificationEndpointsIDPath, h.handlePatchNotificationEndpoint)
memberBackend := MemberBackend{
HTTPErrorHandler: b.HTTPErrorHandler,
Logger: b.Logger.With(zap.String("handler", "member")),
ResourceType: influxdb.NotificationEndpointResourceType,
UserType: influxdb.Member,
UserResourceMappingService: b.UserResourceMappingService,
UserService: b.UserService,
}
h.HandlerFunc("POST", notificationEndpointsIDMembersPath, newPostMemberHandler(memberBackend))
h.HandlerFunc("GET", notificationEndpointsIDMembersPath, newGetMembersHandler(memberBackend))
h.HandlerFunc("DELETE", notificationEndpointsIDMembersIDPath, newDeleteMemberHandler(memberBackend))
ownerBackend := MemberBackend{
HTTPErrorHandler: b.HTTPErrorHandler,
Logger: b.Logger.With(zap.String("handler", "member")),
ResourceType: influxdb.NotificationEndpointResourceType,
UserType: influxdb.Owner,
UserResourceMappingService: b.UserResourceMappingService,
UserService: b.UserService,
}
h.HandlerFunc("POST", notificationEndpointsIDOwnersPath, newPostMemberHandler(ownerBackend))
h.HandlerFunc("GET", notificationEndpointsIDOwnersPath, newGetMembersHandler(ownerBackend))
h.HandlerFunc("DELETE", notificationEndpointsIDOwnersIDPath, newDeleteMemberHandler(ownerBackend))
labelBackend := &LabelBackend{
HTTPErrorHandler: b.HTTPErrorHandler,
Logger: b.Logger.With(zap.String("handler", "label")),
LabelService: b.LabelService,
ResourceType: influxdb.TelegrafsResourceType,
}
h.HandlerFunc("GET", notificationEndpointsIDLabelsIDPath, newGetLabelsHandler(labelBackend))
h.HandlerFunc("POST", notificationEndpointsIDLabelsPath, newPostLabelHandler(labelBackend))
h.HandlerFunc("DELETE", notificationEndpointsIDLabelsIDPath, newDeleteLabelHandler(labelBackend))
return h
}
type notificationEndpointLinks struct {
Self string `json:"self"`
Labels string `json:"labels"`
Members string `json:"members"`
Owners string `json:"owners"`
}
type notificationEndpointResponse struct {
influxdb.NotificationEndpoint
Labels []influxdb.Label `json:"labels"`
Links notificationEndpointLinks `json:"links"`
}
func (resp notificationEndpointResponse) MarshalJSON() ([]byte, error) {
b1, err := json.Marshal(resp.NotificationEndpoint)
if err != nil {
return nil, err
}
b2, err := json.Marshal(struct {
Labels []influxdb.Label `json:"labels"`
Links notificationEndpointLinks `json:"links"`
}{
Links: resp.Links,
Labels: resp.Labels,
})
if err != nil {
return nil, err
}
return []byte(string(b1[:len(b1)-1]) + ", " + string(b2[1:])), nil
}
type notificationEndpointsResponse struct {
NotificationEndpoints []*notificationEndpointResponse `json:"notificationEndpoints"`
Links *influxdb.PagingLinks `json:"links"`
}
func newNotificationEndpointResponse(edp influxdb.NotificationEndpoint, labels []*influxdb.Label) *notificationEndpointResponse {
res := &notificationEndpointResponse{
NotificationEndpoint: edp,
Links: notificationEndpointLinks{
Self: fmt.Sprintf("/api/v2/notificationEndpoints/%s", edp.GetID()),
Labels: fmt.Sprintf("/api/v2/notificationEndpoints/%s/labels", edp.GetID()),
Members: fmt.Sprintf("/api/v2/notificationEndpoints/%s/members", edp.GetID()),
Owners: fmt.Sprintf("/api/v2/notificationEndpoints/%s/owners", edp.GetID()),
},
Labels: []influxdb.Label{},
}
for _, l := range labels {
res.Labels = append(res.Labels, *l)
}
return res
}
func newNotificationEndpointsResponse(ctx context.Context, edps []influxdb.NotificationEndpoint, labelService influxdb.LabelService, f influxdb.PagingFilter, opts influxdb.FindOptions) *notificationEndpointsResponse {
resp := &notificationEndpointsResponse{
NotificationEndpoints: make([]*notificationEndpointResponse, len(edps)),
Links: newPagingLinks(notificationEndpointsPath, opts, f, len(edps)),
}
for i, edp := range edps {
labels, _ := labelService.FindResourceLabels(ctx, influxdb.LabelMappingFilter{ResourceID: edp.GetID()})
resp.NotificationEndpoints[i] = newNotificationEndpointResponse(edp, labels)
}
return resp
}
func decodeGetNotificationEndpointRequest(ctx context.Context, r *http.Request) (i influxdb.ID, err error) {
params := httprouter.ParamsFromContext(ctx)
id := params.ByName("id")
if id == "" {
return i, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "url missing id",
}
}
if err := i.DecodeFromString(id); err != nil {
return i, err
}
return i, nil
}
func (h *NotificationEndpointHandler) handleGetNotificationEndpoints(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
h.Logger.Debug("notificationEndpoints retrieve request", zap.String("r", fmt.Sprint(r)))
filter, opts, err := decodeNotificationEndpointFilter(ctx, r)
if err != nil {
h.Logger.Debug("failed to decode request", zap.Error(err))
h.HandleHTTPError(ctx, err, w)
return
}
edps, _, err := h.NotificationEndpointService.FindNotificationEndpoints(ctx, *filter, *opts)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
h.Logger.Debug("notificationEndpoints retrieved", zap.String("notificationEndpoints", fmt.Sprint(edps)))
if err := encodeResponse(ctx, w, http.StatusOK, newNotificationEndpointsResponse(ctx, edps, h.LabelService, filter, *opts)); err != nil {
logEncodingError(h.Logger, r, err)
return
}
}
func (h *NotificationEndpointHandler) handleGetNotificationEndpoint(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
h.Logger.Debug("notificationEndpoint retrieve request", zap.String("r", fmt.Sprint(r)))
id, err := decodeGetNotificationEndpointRequest(ctx, r)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
edp, err := h.NotificationEndpointService.FindNotificationEndpointByID(ctx, id)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
h.Logger.Debug("notificationEndpoint retrieved", zap.String("notificationEndpoint", fmt.Sprint(edp)))
labels, err := h.LabelService.FindResourceLabels(ctx, influxdb.LabelMappingFilter{ResourceID: edp.GetID()})
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
if err := encodeResponse(ctx, w, http.StatusOK, newNotificationEndpointResponse(edp, labels)); err != nil {
logEncodingError(h.Logger, r, err)
return
}
}
func decodeNotificationEndpointFilter(ctx context.Context, r *http.Request) (*influxdb.NotificationEndpointFilter, *influxdb.FindOptions, error) {
f := &influxdb.NotificationEndpointFilter{}
opts, err := decodeFindOptions(ctx, r)
if err != nil {
return f, nil, err
}
q := r.URL.Query()
if orgIDStr := q.Get("orgID"); orgIDStr != "" {
orgID, err := influxdb.IDFromString(orgIDStr)
if err != nil {
return f, opts, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "orgID is invalid",
Err: err,
}
}
f.OrgID = orgID
} else if orgNameStr := q.Get("org"); orgNameStr != "" {
*f.Org = orgNameStr
}
return f, opts, err
}
func decodePostNotificationEndpointRequest(ctx context.Context, r *http.Request) (influxdb.NotificationEndpoint, error) {
buf := new(bytes.Buffer)
_, err := buf.ReadFrom(r.Body)
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
defer r.Body.Close()
edp, err := endpoint.UnmarshalJSON(buf.Bytes())
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
return edp, nil
}
func decodePutNotificationEndpointRequest(ctx context.Context, r *http.Request) (influxdb.NotificationEndpoint, error) {
buf := new(bytes.Buffer)
_, err := buf.ReadFrom(r.Body)
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
defer r.Body.Close()
edp, err := endpoint.UnmarshalJSON(buf.Bytes())
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
params := httprouter.ParamsFromContext(ctx)
id := params.ByName("id")
if id == "" {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "url missing id",
}
}
i := new(influxdb.ID)
if err := i.DecodeFromString(id); err != nil {
return nil, err
}
edp.SetID(*i)
return edp, nil
}
type patchNotificationEndpointRequest struct {
influxdb.ID
Update influxdb.NotificationEndpointUpdate
}
func decodePatchNotificationEndpointRequest(ctx context.Context, r *http.Request) (*patchNotificationEndpointRequest, error) {
req := &patchNotificationEndpointRequest{}
params := httprouter.ParamsFromContext(ctx)
id := params.ByName("id")
if id == "" {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "url missing id",
}
}
var i influxdb.ID
if err := i.DecodeFromString(id); err != nil {
return nil, err
}
req.ID = i
upd := &influxdb.NotificationEndpointUpdate{}
if err := json.NewDecoder(r.Body).Decode(upd); err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: err.Error(),
}
}
if err := upd.Valid(); err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: err.Error(),
}
}
req.Update = *upd
return req, nil
}
// handlePostNotificationEndpoint is the HTTP handler for the POST /api/v2/notificationEndpoints route.
func (h *NotificationEndpointHandler) handlePostNotificationEndpoint(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
h.Logger.Debug("notificationEndpoint create request", zap.String("r", fmt.Sprint(r)))
edp, err := decodePostNotificationEndpointRequest(ctx, r)
if err != nil {
h.Logger.Debug("failed to decode request", zap.Error(err))
h.HandleHTTPError(ctx, err, w)
return
}
auth, err := pctx.GetAuthorizer(ctx)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
if err := h.NotificationEndpointService.CreateNotificationEndpoint(ctx, edp, auth.GetUserID()); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
h.Logger.Debug("notificationEndpoint created", zap.String("notificationEndpoint", fmt.Sprint(edp)))
if err := encodeResponse(ctx, w, http.StatusCreated, newNotificationEndpointResponse(edp, []*influxdb.Label{})); err != nil {
logEncodingError(h.Logger, r, err)
return
}
}
// handlePutNotificationEndpoint is the HTTP handler for the PUT /api/v2/notificationEndpoints route.
func (h *NotificationEndpointHandler) handlePutNotificationEndpoint(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
h.Logger.Debug("notificationEndpoint replace request", zap.String("r", fmt.Sprint(r)))
edp, err := decodePutNotificationEndpointRequest(ctx, r)
if err != nil {
h.Logger.Debug("failed to decode request", zap.Error(err))
h.HandleHTTPError(ctx, err, w)
return
}
auth, err := pctx.GetAuthorizer(ctx)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
edp, err = h.NotificationEndpointService.UpdateNotificationEndpoint(ctx, edp.GetID(), edp, auth.GetUserID())
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
labels, err := h.LabelService.FindResourceLabels(ctx, influxdb.LabelMappingFilter{ResourceID: edp.GetID()})
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
h.Logger.Debug("notificationEndpoint replaced", zap.String("notificationEndpoint", fmt.Sprint(edp)))
if err := encodeResponse(ctx, w, http.StatusOK, newNotificationEndpointResponse(edp, labels)); err != nil {
logEncodingError(h.Logger, r, err)
return
}
}
// handlePatchNotificationEndpoint is the HTTP handler for the PATCH /api/v2/notificationEndpoints/:id route.
func (h *NotificationEndpointHandler) handlePatchNotificationEndpoint(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
h.Logger.Debug("notificationEndpoint patch request", zap.String("r", fmt.Sprint(r)))
req, err := decodePatchNotificationEndpointRequest(ctx, r)
if err != nil {
h.Logger.Debug("failed to decode request", zap.Error(err))
h.HandleHTTPError(ctx, err, w)
return
}
edp, err := h.NotificationEndpointService.PatchNotificationEndpoint(ctx, req.ID, req.Update)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
labels, err := h.LabelService.FindResourceLabels(ctx, influxdb.LabelMappingFilter{ResourceID: edp.GetID()})
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
h.Logger.Debug("notificationEndpoint patch", zap.String("notificationEndpoint", fmt.Sprint(edp)))
if err := encodeResponse(ctx, w, http.StatusOK, newNotificationEndpointResponse(edp, labels)); err != nil {
logEncodingError(h.Logger, r, err)
return
}
}
func (h *NotificationEndpointHandler) handleDeleteNotificationEndpoint(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
h.Logger.Debug("notificationEndpoint delete request", zap.String("r", fmt.Sprint(r)))
i, err := decodeGetNotificationEndpointRequest(ctx, r)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
if err = h.NotificationEndpointService.DeleteNotificationEndpoint(ctx, i); err != nil {
h.HandleHTTPError(ctx, err, w)
return
}
h.Logger.Debug("notificationEndpoint deleted", zap.String("notificationEndpointID", fmt.Sprint(i)))
w.WriteHeader(http.StatusNoContent)
}

File diff suppressed because it is too large Load Diff

544
kv/notification_endpoint.go Normal file
View File

@ -0,0 +1,544 @@
package kv
import (
"context"
"encoding/json"
"fmt"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/notification/endpoint"
"github.com/influxdata/influxdb"
)
var (
notificationEndpointBucket = []byte("notificationEndpointv1")
notificationEndpointIndex = []byte("notificationEndpointIndexv1")
// ErrNotificationEndpointNotFound is used when the notification endpoint is not found.
ErrNotificationEndpointNotFound = &influxdb.Error{
Msg: "notification endpoint not found",
Code: influxdb.ENotFound,
}
// ErrInvalidNotificationEndpointID is used when the service was provided
// an invalid ID format.
ErrInvalidNotificationEndpointID = &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "provided notification endpoint ID has invalid format",
}
)
var _ influxdb.NotificationEndpointService = (*Service)(nil)
func (s *Service) initializeNotificationEndpoint(ctx context.Context, tx Tx) error {
if _, err := s.notificationEndpointBucket(tx); err != nil {
return err
}
if _, err := s.notificationEndpointIndexBucket(tx); err != nil {
return err
}
return nil
}
// UnavailableNotificationEndpointStoreError is used if we aren't able to interact with the
// store, it means the store is not available at the moment (e.g. network).
func UnavailableNotificationEndpointStoreError(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: fmt.Sprintf("Unable to connect to notification endpoint store service. Please try again; Err: %v", err),
Op: "kv/notificationEndpoint",
}
}
// UnavailableNotificationEndpointIndexError is used when the error comes from an internal system.
func UnavailableNotificationEndpointIndexError(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: fmt.Sprintf("unexpected error retrieving notification endpoint's index bucket; Err %v", err),
Op: "kv/notificationEndpointIndex",
}
}
// InternalNotificationEndpointStoreError is used when the error comes from an
// internal system.
func InternalNotificationEndpointStoreError(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: fmt.Sprintf("Unknown internal notification endpoint data error; Err: %v", err),
Op: "kv/notificationEndpoint",
}
}
func (s *Service) notificationEndpointBucket(tx Tx) (Bucket, error) {
b, err := tx.Bucket(notificationEndpointBucket)
if err != nil {
return nil, UnavailableNotificationEndpointStoreError(err)
}
return b, nil
}
func (s *Service) notificationEndpointIndexBucket(tx Tx) (Bucket, error) {
b, err := tx.Bucket(notificationEndpointIndex)
if err != nil {
return nil, UnavailableNotificationEndpointIndexError(err)
}
return b, nil
}
// CreateNotificationEndpoint creates a new notification endpoint and sets b.ID with the new identifier.
func (s *Service) CreateNotificationEndpoint(ctx context.Context, edp influxdb.NotificationEndpoint, userID influxdb.ID) error {
return s.kv.Update(ctx, func(tx Tx) error {
return s.createNotificationEndpoint(ctx, tx, edp, userID)
})
}
func (s *Service) createNotificationEndpoint(ctx context.Context, tx Tx, edp influxdb.NotificationEndpoint, userID influxdb.ID) error {
if edp.GetOrgID().Valid() {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
_, pe := s.findOrganizationByID(ctx, tx, edp.GetOrgID())
if pe != nil {
return &influxdb.Error{
Op: influxdb.OpCreateCheck,
Err: pe,
}
}
}
// notification endpoint name unique
if _, err := s.findNotificationEndpointByName(ctx, tx, edp.GetOrgID(), edp.GetName()); err == nil {
if err == nil {
return &influxdb.Error{
Code: influxdb.EConflict,
Msg: fmt.Sprintf("notification endpoint with name %s already exists", edp.GetName()),
}
}
}
id := s.IDGenerator.ID()
edp.SetID(id)
now := s.TimeGenerator.Now()
edp.SetCreatedAt(now)
edp.SetUpdatedAt(now)
edp.BackfillSecretKeys()
for _, fld := range edp.SecretFields() {
if fld.Value != nil {
if err := s.putSecret(ctx, tx, edp.GetOrgID(),
fld.Key, *fld.Value); err != nil {
return InternalNotificationEndpointStoreError(err)
}
}
}
if err := s.putNotificationEndpoint(ctx, tx, edp); err != nil {
return err
}
urm := &influxdb.UserResourceMapping{
ResourceID: id,
UserID: userID,
UserType: influxdb.Owner,
ResourceType: influxdb.NotificationEndpointResourceType,
}
return s.createUserResourceMapping(ctx, tx, urm)
}
func (s *Service) findNotificationEndpointByName(ctx context.Context, tx Tx, orgID influxdb.ID, n string) (influxdb.NotificationEndpoint, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
key, err := notificationEndpointIndexKey(orgID, n)
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
idx, err := s.notificationEndpointIndexBucket(tx)
if err != nil {
return nil, err
}
buf, err := idx.Get(key)
if IsNotFound(err) {
return nil, &influxdb.Error{
Code: influxdb.ENotFound,
Msg: fmt.Sprintf("notification endpoint %q not found", n),
}
}
if err != nil {
return nil, err
}
var id influxdb.ID
if err := id.Decode(buf); err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
edp, _, _, err := s.findNotificationEndpointByID(ctx, tx, id)
return edp, err
}
// UpdateNotificationEndpoint updates a single notification endpoint.
// Returns the new notification endpoint after update.
func (s *Service) UpdateNotificationEndpoint(ctx context.Context, id influxdb.ID, edp influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
var err error
err = s.kv.Update(ctx, func(tx Tx) error {
edp, err = s.updateNotificationEndpoint(ctx, tx, id, edp, userID)
return err
})
return edp, err
}
func (s *Service) updateNotificationEndpoint(ctx context.Context, tx Tx, id influxdb.ID, edp influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
current, _, _, err := s.findNotificationEndpointByID(ctx, tx, id)
if err != nil {
return nil, err
}
if edp.GetName() != current.GetName() {
edp0, err := s.findNotificationEndpointByName(ctx, tx, current.GetOrgID(), edp.GetName())
if err == nil && edp0.GetID() != id {
return nil, &influxdb.Error{
Code: influxdb.EConflict,
Msg: "notification endpoint name is not unique",
}
}
key, err := notificationEndpointIndexKey(current.GetOrgID(), current.GetName())
if err != nil {
return nil, err
}
idx, err := s.notificationEndpointIndexBucket(tx)
if err != nil {
return nil, err
}
if err := idx.Delete(key); err != nil {
return nil, err
}
}
// ID and OrganizationID can not be updated
edp.SetID(current.GetID())
edp.SetOrgID(current.GetOrgID())
edp.SetCreatedAt(current.GetCRUDLog().CreatedAt)
edp.SetUpdatedAt(s.TimeGenerator.Now())
edp.BackfillSecretKeys()
for _, fld := range edp.SecretFields() {
if fld.Value != nil {
if err = s.putSecret(ctx, tx, edp.GetOrgID(),
fld.Key, *fld.Value); err != nil {
return nil, InternalNotificationEndpointStoreError(err)
}
}
}
err = s.putNotificationEndpoint(ctx, tx, edp)
return edp, err
}
// PatchNotificationEndpoint updates a single notification endpoint with changeset.
// Returns the new notification endpoint state after update.
func (s *Service) PatchNotificationEndpoint(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error) {
var edp influxdb.NotificationEndpoint
if err := s.kv.Update(ctx, func(tx Tx) (err error) {
edp, err = s.patchNotificationEndpoint(ctx, tx, id, upd)
if err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
return edp, nil
}
func (s *Service) patchNotificationEndpoint(ctx context.Context, tx Tx, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error) {
edp, _, _, err := s.findNotificationEndpointByID(ctx, tx, id)
if err != nil {
return nil, err
}
if upd.Name != nil {
edp0, err := s.findNotificationEndpointByName(ctx, tx, edp.GetOrgID(), *upd.Name)
if err == nil && edp0.GetID() != id {
return nil, &influxdb.Error{
Code: influxdb.EConflict,
Msg: "notification endpoint name is not unique",
}
}
key, err := notificationEndpointIndexKey(edp.GetOrgID(), edp.GetName())
if err != nil {
return nil, err
}
idx, err := s.notificationEndpointIndexBucket(tx)
if err != nil {
return nil, err
}
if err := idx.Delete(key); err != nil {
return nil, err
}
}
if upd.Name != nil {
edp.SetName(*upd.Name)
}
if upd.Description != nil {
edp.SetDescription(*upd.Description)
}
if upd.Status != nil {
edp.SetStatus(*upd.Status)
}
edp.SetUpdatedAt(s.TimeGenerator.Now())
err = s.putNotificationEndpoint(ctx, tx, edp)
if err != nil {
return nil, err
}
return edp, nil
}
// PutNotificationEndpoint put a notification endpoint to storage.
func (s *Service) PutNotificationEndpoint(ctx context.Context, edp influxdb.NotificationEndpoint) error {
return s.kv.Update(ctx, func(tx Tx) (err error) {
return s.putNotificationEndpoint(ctx, tx, edp)
})
}
func (s *Service) putNotificationEndpoint(ctx context.Context, tx Tx, edp influxdb.NotificationEndpoint) error {
if err := edp.Valid(); err != nil {
return err
}
for _, k := range edp.SecretFields() {
if _, err := s.loadSecret(ctx, tx, edp.GetOrgID(), string(k.Key)); err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "Unable to locate secret key: " + string(k.Key),
}
}
}
encodedID, _ := edp.GetID().Encode()
v, err := json.Marshal(edp)
if err != nil {
return err
}
key, pe := notificationEndpointIndexKey(edp.GetOrgID(), edp.GetName())
if err != nil {
return pe
}
idx, err := s.notificationEndpointIndexBucket(tx)
if err != nil {
return err
}
if err := idx.Put(key, encodedID); err != nil {
return &influxdb.Error{
Err: err,
}
}
bucket, err := s.notificationEndpointBucket(tx)
if err != nil {
return err
}
if err := bucket.Put(encodedID, v); err != nil {
return UnavailableNotificationEndpointStoreError(err)
}
return nil
}
// notificationEndpointIndexKey is a combination of the orgID and the notification endpoint name.
func notificationEndpointIndexKey(orgID influxdb.ID, name string) ([]byte, error) {
orgIDEncoded, err := orgID.Encode()
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
k := make([]byte, influxdb.IDLength+len(name))
copy(k, orgIDEncoded)
copy(k[influxdb.IDLength:], []byte(name))
return k, nil
}
// FindNotificationEndpointByID returns a single notification endpoint by ID.
func (s *Service) FindNotificationEndpointByID(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
var (
edp influxdb.NotificationEndpoint
err error
)
err = s.kv.View(ctx, func(tx Tx) error {
edp, _, _, err = s.findNotificationEndpointByID(ctx, tx, id)
return err
})
return edp, err
}
func (s *Service) findNotificationEndpointByID(ctx context.Context, tx Tx,
id influxdb.ID) (edp influxdb.NotificationEndpoint, encID []byte, bucket Bucket, err error) {
encID, err = id.Encode()
if err != nil {
return nil, encID, bucket, ErrInvalidNotificationEndpointID
}
bucket, err = s.notificationEndpointBucket(tx)
if err != nil {
return nil, encID, bucket, err
}
v, err := bucket.Get(encID)
if IsNotFound(err) {
return nil, encID, bucket, ErrNotificationEndpointNotFound
}
if err != nil {
return nil, encID, bucket, InternalNotificationEndpointStoreError(err)
}
edp, err = endpoint.UnmarshalJSON(v)
return edp, encID, bucket, err
}
// FindNotificationEndpoints returns a list of notification endpoints that match filter and the total count of matching notification endpoints.
// Additional options provide pagination & sorting.
func (s *Service) FindNotificationEndpoints(ctx context.Context, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) (edps []influxdb.NotificationEndpoint, n int, err error) {
err = s.kv.View(ctx, func(tx Tx) error {
edps, n, err = s.findNotificationEndpoints(ctx, tx, filter)
return err
})
return edps, n, err
}
func (s *Service) findNotificationEndpoints(ctx context.Context, tx Tx, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error) {
edps := make([]influxdb.NotificationEndpoint, 0)
if filter.Org != nil {
o, err := s.findOrganizationByName(ctx, tx, *filter.Org)
if err != nil {
return nil, 0, &influxdb.Error{
Err: err,
}
}
filter.OrgID = &o.ID
}
var offset, limit, count int
var descending bool
if len(opt) > 0 {
offset = opt[0].Offset
limit = opt[0].Limit
descending = opt[0].Descending
}
filterFn := filterNotificationEndpointsFn(filter)
err := s.forEachNotificationEndpoint(ctx, tx, descending, func(edp influxdb.NotificationEndpoint) bool {
if filterFn(edp) {
if count >= offset {
edps = append(edps, edp)
}
count++
}
if limit > 0 && len(edps) >= limit {
return false
}
return true
})
return edps, len(edps), err
}
// forEachNotificationEndpoint will iterate through all notification endpoints while fn returns true.
func (s *Service) forEachNotificationEndpoint(ctx context.Context, tx Tx, descending bool, fn func(influxdb.NotificationEndpoint) bool) error {
bkt, err := s.notificationEndpointBucket(tx)
if err != nil {
return err
}
cur, err := bkt.Cursor()
if err != nil {
return err
}
var k, v []byte
if descending {
k, v = cur.Last()
} else {
k, v = cur.First()
}
for k != nil {
edp, err := endpoint.UnmarshalJSON(v)
if err != nil {
return err
}
if !fn(edp) {
break
}
if descending {
k, v = cur.Prev()
} else {
k, v = cur.Next()
}
}
return nil
}
func filterNotificationEndpointsFn(filter influxdb.NotificationEndpointFilter) func(edp influxdb.NotificationEndpoint) bool {
return func(edp influxdb.NotificationEndpoint) bool {
if filter.ID != nil {
if edp.GetID() != *filter.ID {
return false
}
}
if filter.OrgID != nil {
if edp.GetOrgID() != *filter.OrgID {
return false
}
}
return true
}
}
// DeleteNotificationEndpoint removes a notification endpoint by ID.
func (s *Service) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) error {
return s.kv.Update(ctx, func(tx Tx) error {
return s.deleteNotificationEndpoint(ctx, tx, id)
})
}
func (s *Service) deleteNotificationEndpoint(ctx context.Context, tx Tx, id influxdb.ID) error {
edp, encID, bucket, err := s.findNotificationEndpointByID(ctx, tx, id)
if err != nil {
return err
}
if err = bucket.Delete(encID); err != nil {
return InternalNotificationEndpointStoreError(err)
}
for _, fld := range edp.SecretFields() {
if err := s.deleteSecret(ctx, tx, edp.GetOrgID(), fld.Key); err != nil {
InternalNotificationEndpointStoreError(err)
}
}
return s.deleteUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{
ResourceID: id,
ResourceType: influxdb.NotificationEndpointResourceType,
})
}

View File

@ -0,0 +1,111 @@
package kv_test
import (
"context"
"testing"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kv"
influxdbtesting "github.com/influxdata/influxdb/testing"
)
func TestBoltNotificationEndpointService(t *testing.T) {
influxdbtesting.NotificationEndpointService(initBoltNotificationEndpointService, t)
}
func TestNotificationEndpointService(t *testing.T) {
influxdbtesting.NotificationEndpointService(initInmemNotificationEndpointService, t)
}
func initBoltNotificationEndpointService(f influxdbtesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, func()) {
s, closeBolt, err := NewTestBoltStore()
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, closeSvc := initNotificationEndpointService(s, f, t)
return svc, func() {
closeSvc()
closeBolt()
}
}
func initInmemNotificationEndpointService(f influxdbtesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, func()) {
s, closeInmem, err := NewTestInmemStore()
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, closeSvc := initNotificationEndpointService(s, f, t)
return svc, func() {
closeSvc()
closeInmem()
}
}
func initNotificationEndpointService(s kv.Store, f influxdbtesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, func()) {
svc := kv.NewService(s)
svc.IDGenerator = f.IDGenerator
svc.TimeGenerator = f.TimeGenerator
if f.TimeGenerator == nil {
svc.TimeGenerator = influxdb.RealTimeGenerator{}
}
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing user service: %v", err)
}
for _, s := range f.Secrets {
for k, v := range s.Env {
if err := svc.PutSecret(ctx, s.OrganizationID, k, v); err != nil {
t.Fatalf("failed to populate secrets")
}
}
}
for _, edp := range f.NotificationEndpoints {
if err := svc.PutNotificationEndpoint(ctx, edp); err != nil {
t.Fatalf("failed to populate notification endpoint: %v", err)
}
}
for _, o := range f.Orgs {
if err := svc.PutOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate org: %v", err)
}
}
for _, m := range f.UserResourceMappings {
if err := svc.CreateUserResourceMapping(ctx, m); err != nil {
t.Fatalf("failed to populate user resource mapping: %v", err)
}
}
return svc, func() {
for _, edp := range f.NotificationEndpoints {
if err := svc.DeleteNotificationEndpoint(ctx, edp.GetID()); err != nil {
t.Logf("failed to remove notification endpoint: %v", err)
}
}
for _, o := range f.Orgs {
if err := svc.DeleteOrganization(ctx, o.ID); err != nil {
t.Fatalf("failed to remove org: %v", err)
}
}
for _, urm := range f.UserResourceMappings {
if err := svc.DeleteUserResourceMapping(ctx, urm.ResourceID, urm.UserID); err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
t.Logf("failed to remove urm rule: %v", err)
}
}
for _, s := range f.Secrets {
for k, v := range s.Env {
if err := svc.DeleteSecret(ctx, s.OrganizationID, k, v); err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
t.Fatalf("failed to populate secrets")
}
}
}
}
}

View File

@ -207,8 +207,8 @@ func decodeSecretKey(key []byte) (influxdb.ID, string, error) {
func decodeSecretValue(val []byte) (string, error) { func decodeSecretValue(val []byte) (string, error) {
// store the secret value base64 encoded so that it's marginally better than plaintext // store the secret value base64 encoded so that it's marginally better than plaintext
v := make([]byte, base64.StdEncoding.DecodedLen(len(val))) v, err := base64.StdEncoding.DecodeString(string(val))
if _, err := base64.StdEncoding.Decode(v, val); err != nil { if err != nil {
return "", err return "", err
} }

View File

@ -134,6 +134,10 @@ func (s *Service) Initialize(ctx context.Context) error {
return err return err
} }
if err := s.initializeNotificationEndpoint(ctx, tx); err != nil {
return err
}
return s.initializeUsers(ctx, tx) return s.initializeUsers(ctx, tx)
}) })
} }

View File

@ -0,0 +1,55 @@
package mock
import (
"context"
"github.com/influxdata/influxdb"
)
var _ influxdb.NotificationEndpointService = &NotificationEndpointService{}
// NotificationEndpointService represents a service for managing notification rule data.
type NotificationEndpointService struct {
OrganizationService
UserResourceMappingService
SecretService
FindNotificationEndpointByIDF func(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error)
FindNotificationEndpointsF func(ctx context.Context, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error)
CreateNotificationEndpointF func(ctx context.Context, nr influxdb.NotificationEndpoint, userID influxdb.ID) error
UpdateNotificationEndpointF func(ctx context.Context, id influxdb.ID, nr influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error)
PatchNotificationEndpointF func(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error)
DeleteNotificationEndpointF func(ctx context.Context, id influxdb.ID) error
}
// FindNotificationEndpointByID returns a single telegraf config by ID.
func (s *NotificationEndpointService) FindNotificationEndpointByID(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
return s.FindNotificationEndpointByIDF(ctx, id)
}
// FindNotificationEndpoints returns a list of notification rules that match filter and the total count of matching notification rules.
// Additional options provide pagination & sorting.
func (s *NotificationEndpointService) FindNotificationEndpoints(ctx context.Context, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error) {
return s.FindNotificationEndpointsF(ctx, filter, opt...)
}
// CreateNotificationEndpoint creates a new notification rule and sets ID with the new identifier.
func (s *NotificationEndpointService) CreateNotificationEndpoint(ctx context.Context, nr influxdb.NotificationEndpoint, userID influxdb.ID) error {
return s.CreateNotificationEndpointF(ctx, nr, userID)
}
// UpdateNotificationEndpoint updates a single notification rule.
// Returns the new notification rule after update.
func (s *NotificationEndpointService) UpdateNotificationEndpoint(ctx context.Context, id influxdb.ID, nr influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
return s.UpdateNotificationEndpointF(ctx, id, nr, userID)
}
// PatchNotificationEndpoint updates a single notification rule with changeset.
// Returns the new notification rule after update.
func (s *NotificationEndpointService) PatchNotificationEndpoint(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error) {
return s.PatchNotificationEndpointF(ctx, id, upd)
}
// DeleteNotificationEndpoint removes a notification rule by ID.
func (s *NotificationEndpointService) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) error {
return s.DeleteNotificationEndpointF(ctx, id)
}

View File

@ -5,9 +5,9 @@ import (
"encoding/json" "encoding/json"
) )
// Updator is general interface to embed // Updater is general interface to embed
// with any domain level interface to do crud related ops. // with any domain level interface to do crud related ops.
type Updator interface { type Updater interface {
CRUDLogSetter CRUDLogSetter
SetID(id ID) SetID(id ID)
SetOrgID(id ID) SetOrgID(id ID)
@ -34,7 +34,7 @@ type NotificationRule interface {
Valid() error Valid() error
Type() string Type() string
json.Marshaler json.Marshaler
Updator Updater
Getter Getter
SetOwnerID(id ID) SetOwnerID(id ID)
GetOwnerID() ID GetOwnerID() ID
@ -71,12 +71,14 @@ func (f NotificationRuleFilter) QueryParams() map[string][]string {
return qp return qp
} }
// NotificationRuleUpdate is the set of upgrade fields for patch request.
type NotificationRuleUpdate struct { type NotificationRuleUpdate struct {
Name *string `json:"name,omitempty"` Name *string `json:"name,omitempty"`
Description *string `json:"description,omitempty"` Description *string `json:"description,omitempty"`
Status *Status `json:"status,omitempty"` Status *Status `json:"status,omitempty"`
} }
// Valid will verify if the NotificationRuleUpdate is valid.
func (n *NotificationRuleUpdate) Valid() error { func (n *NotificationRuleUpdate) Valid() error {
if n.Name != nil && *n.Name == "" { if n.Name != nil && *n.Name == "" {
return &Error{ return &Error{

View File

@ -0,0 +1,131 @@
package endpoint
import (
"encoding/json"
"fmt"
"github.com/influxdata/influxdb"
)
// types of endpoints.
const (
SlackType = "slack"
PagerDutyType = "pagerduty"
WebhookType = "webhook"
)
var typeToEndpoint = map[string](func() influxdb.NotificationEndpoint){
SlackType: func() influxdb.NotificationEndpoint { return &Slack{} },
PagerDutyType: func() influxdb.NotificationEndpoint { return &PagerDuty{} },
WebhookType: func() influxdb.NotificationEndpoint { return &WebHook{} },
}
type rawJSON struct {
Type string `json:"type"`
}
// UnmarshalJSON will convert the bytes to notification endpoint.
func UnmarshalJSON(b []byte) (influxdb.NotificationEndpoint, error) {
var raw rawJSON
if err := json.Unmarshal(b, &raw); err != nil {
return nil, &influxdb.Error{
Msg: "unable to detect the notification endpoint type from json",
}
}
convertedFunc, ok := typeToEndpoint[raw.Type]
if !ok {
return nil, &influxdb.Error{
Msg: fmt.Sprintf("invalid notification endpoint type %s", raw.Type),
}
}
converted := convertedFunc()
err := json.Unmarshal(b, converted)
return converted, err
}
// Base is the embed struct of every notification endpoint.
type Base struct {
ID influxdb.ID `json:"id,omitempty"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
OrgID influxdb.ID `json:"orgID,omitempty"`
Status influxdb.Status `json:"status"`
influxdb.CRUDLog
}
func (b Base) valid() error {
if !b.ID.Valid() {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "Notification Endpoint ID is invalid",
}
}
if b.Name == "" {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "Notification Endpoint Name can't be empty",
}
}
if b.Status != influxdb.Active && b.Status != influxdb.Inactive {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "invalid status",
}
}
return nil
}
// GetID implements influxdb.Getter interface.
func (b Base) GetID() influxdb.ID {
return b.ID
}
// GetName implements influxdb.Getter interface.
func (b *Base) GetName() string {
return b.Name
}
// GetOrgID implements influxdb.Getter interface.
func (b Base) GetOrgID() influxdb.ID {
return b.OrgID
}
// GetCRUDLog implements influxdb.Getter interface.
func (b Base) GetCRUDLog() influxdb.CRUDLog {
return b.CRUDLog
}
// GetDescription implements influxdb.Getter interface.
func (b *Base) GetDescription() string {
return b.Description
}
// GetStatus implements influxdb.Getter interface.
func (b *Base) GetStatus() influxdb.Status {
return b.Status
}
// SetID will set the primary key.
func (b *Base) SetID(id influxdb.ID) {
b.ID = id
}
// SetOrgID will set the org key.
func (b *Base) SetOrgID(id influxdb.ID) {
b.OrgID = id
}
// SetName implements influxdb.Updator interface.
func (b *Base) SetName(name string) {
b.Name = name
}
// SetDescription implements influxdb.Updator interface.
func (b *Base) SetDescription(description string) {
b.Description = description
}
// SetStatus implements influxdb.Updator interface.
func (b *Base) SetStatus(status influxdb.Status) {
b.Status = status
}

View File

@ -0,0 +1,391 @@
package endpoint_test
import (
"encoding/json"
"net/http"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/notification/endpoint"
influxTesting "github.com/influxdata/influxdb/testing"
)
const (
id1 = "020f755c3c082000"
id3 = "020f755c3c082002"
)
var goodBase = endpoint.Base{
ID: influxTesting.MustIDBase16(id1),
Name: "name1",
OrgID: influxTesting.MustIDBase16(id3),
Status: influxdb.Active,
Description: "desc1",
}
func TestValidEndpoint(t *testing.T) {
cases := []struct {
name string
src influxdb.NotificationEndpoint
err error
}{
{
name: "invalid endpoint id",
src: &endpoint.Slack{},
err: &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "Notification Endpoint ID is invalid",
},
},
{
name: "invalid status",
src: &endpoint.PagerDuty{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16(id1),
Name: "name1",
OrgID: influxTesting.MustIDBase16(id3),
},
},
err: &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "invalid status",
},
},
{
name: "empty slack url",
src: &endpoint.Slack{
Base: goodBase,
},
err: &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "slack endpoint URL is empty",
},
},
{
name: "invalid slack url",
src: &endpoint.Slack{
Base: goodBase,
URL: "posts://er:{DEf1=ghi@:5432/db?ssl",
},
err: &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "slack endpoint URL is invalid: parse posts://er:{DEf1=ghi@:5432/db?ssl: net/url: invalid userinfo",
},
},
{
name: "empty slack token",
src: &endpoint.Slack{
Base: goodBase,
URL: "localhost",
},
err: &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "slack endpoint token is invalid",
},
},
{
name: "invalid slack token",
src: &endpoint.Slack{
Base: goodBase,
URL: "localhost",
Token: influxdb.SecretField{Key: "bad-key"},
},
err: &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "slack endpoint token is invalid",
},
},
{
name: "empty pagerduty url",
src: &endpoint.PagerDuty{
Base: goodBase,
},
err: &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "pagerduty endpoint URL is empty",
},
},
{
name: "invalid pagerduty url",
src: &endpoint.PagerDuty{
Base: goodBase,
URL: "posts://er:{DEf1=ghi@:5432/db?ssl",
},
err: &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "pagerduty endpoint URL is invalid: parse posts://er:{DEf1=ghi@:5432/db?ssl: net/url: invalid userinfo",
},
},
{
name: "invalid routine key",
src: &endpoint.PagerDuty{
Base: goodBase,
URL: "localhost",
RoutingKey: influxdb.SecretField{Key: "bad-key"},
},
err: &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "pagerduty routing key is invalid",
},
},
{
name: "empty webhook http method",
src: &endpoint.WebHook{
Base: goodBase,
URL: "localhost",
},
err: &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "invalid webhook http method",
},
},
{
name: "empty webhook token",
src: &endpoint.WebHook{
Base: goodBase,
URL: "localhost",
Method: "GET",
AuthMethod: "bearer",
},
err: &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "invalid webhook token for bearer auth",
},
},
{
name: "empty webhook username",
src: &endpoint.WebHook{
Base: goodBase,
URL: "localhost",
Method: http.MethodGet,
AuthMethod: "basic",
},
err: &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "invalid webhook username/password for basic auth",
},
},
}
for _, c := range cases {
got := c.src.Valid()
influxTesting.ErrorsEqual(t, got, c.err)
}
}
var timeGen1 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 13, 4, 19, 10, 0, time.UTC)}
var timeGen2 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 14, 5, 23, 53, 10, time.UTC)}
func TestJSON(t *testing.T) {
cases := []struct {
name string
src influxdb.NotificationEndpoint
}{
{
name: "simple Slack",
src: &endpoint.Slack{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16(id1),
Name: "name1",
OrgID: influxTesting.MustIDBase16(id3),
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
UpdatedAt: timeGen2.Now(),
},
},
URL: "https://slack.com/api/chat.postMessage",
Token: influxdb.SecretField{Key: "token-key-1"},
},
},
{
name: "simple pagerduty",
src: &endpoint.PagerDuty{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16(id1),
Name: "name1",
OrgID: influxTesting.MustIDBase16(id3),
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
UpdatedAt: timeGen2.Now(),
},
},
URL: "https://events.pagerduty.com/v2/enqueue",
RoutingKey: influxdb.SecretField{Key: "pagerduty-routing-key"},
},
},
{
name: "simple webhook",
src: &endpoint.WebHook{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16(id1),
Name: "name1",
OrgID: influxTesting.MustIDBase16(id3),
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
UpdatedAt: timeGen2.Now(),
},
},
AuthMethod: "basic",
URL: "http://example.com",
Username: influxdb.SecretField{Key: "username-key"},
Password: influxdb.SecretField{Key: "password-key"},
},
},
}
for _, c := range cases {
b, err := json.Marshal(c.src)
if err != nil {
t.Fatalf("%s marshal failed, err: %s", c.name, err.Error())
}
got, err := endpoint.UnmarshalJSON(b)
if err != nil {
t.Fatalf("%s unmarshal failed, err: %s", c.name, err.Error())
}
if diff := cmp.Diff(got, c.src); diff != "" {
t.Errorf("failed %s, NotificationEndpoint are different -got/+want\ndiff %s", c.name, diff)
}
}
}
func TestBackFill(t *testing.T) {
cases := []struct {
name string
src influxdb.NotificationEndpoint
target influxdb.NotificationEndpoint
}{
{
name: "simple Slack",
src: &endpoint.Slack{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16(id1),
Name: "name1",
OrgID: influxTesting.MustIDBase16(id3),
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
UpdatedAt: timeGen2.Now(),
},
},
URL: "https://slack.com/api/chat.postMessage",
Token: influxdb.SecretField{
Value: strPtr("token-value"),
},
},
target: &endpoint.Slack{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16(id1),
Name: "name1",
OrgID: influxTesting.MustIDBase16(id3),
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
UpdatedAt: timeGen2.Now(),
},
},
URL: "https://slack.com/api/chat.postMessage",
Token: influxdb.SecretField{
Key: id1 + "-token",
Value: strPtr("token-value"),
},
},
},
{
name: "simple pagerduty",
src: &endpoint.PagerDuty{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16(id1),
Name: "name1",
OrgID: influxTesting.MustIDBase16(id3),
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
UpdatedAt: timeGen2.Now(),
},
},
URL: "https://events.pagerduty.com/v2/enqueue",
RoutingKey: influxdb.SecretField{
Value: strPtr("routing-key-value"),
},
},
target: &endpoint.PagerDuty{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16(id1),
Name: "name1",
OrgID: influxTesting.MustIDBase16(id3),
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
UpdatedAt: timeGen2.Now(),
},
},
URL: "https://events.pagerduty.com/v2/enqueue",
RoutingKey: influxdb.SecretField{
Key: id1 + "-routing-key",
Value: strPtr("routing-key-value"),
},
},
},
{
name: "webhook with token",
src: &endpoint.WebHook{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16(id1),
Name: "name1",
OrgID: influxTesting.MustIDBase16(id3),
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
UpdatedAt: timeGen2.Now(),
},
},
AuthMethod: "basic",
URL: "http://example.com",
Username: influxdb.SecretField{
Value: strPtr("username1"),
},
Password: influxdb.SecretField{
Value: strPtr("password1"),
},
},
target: &endpoint.WebHook{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16(id1),
Name: "name1",
OrgID: influxTesting.MustIDBase16(id3),
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
UpdatedAt: timeGen2.Now(),
},
},
AuthMethod: "basic",
URL: "http://example.com",
Username: influxdb.SecretField{
Key: id1 + "-username",
Value: strPtr("username1"),
},
Password: influxdb.SecretField{
Key: id1 + "-password",
Value: strPtr("password1"),
},
},
},
}
for _, c := range cases {
c.src.BackfillSecretKeys()
if diff := cmp.Diff(c.target, c.src); diff != "" {
t.Errorf("failed %s, NotificationEndpoint are different -got/+want\ndiff %s", c.name, diff)
}
}
}
func strPtr(s string) *string {
ss := new(string)
*ss = s
return ss
}

View File

@ -0,0 +1,83 @@
package endpoint
import (
"encoding/json"
"fmt"
"net/url"
"github.com/influxdata/influxdb"
)
var _ influxdb.NotificationEndpoint = &PagerDuty{}
const routingKeySuffix = "-routing-key"
// PagerDuty is the notification endpoint config of pagerduty.
type PagerDuty struct {
Base
// Path is the PagerDuty API URL, should not need to be changed.
URL string `json:"url"`
// RoutingKey is a version 4 UUID expressed as a 32-digit hexadecimal number.
// This is the Integration Key for an integration on any given service.
RoutingKey influxdb.SecretField `json:"routing-key"`
}
// BackfillSecretKeys fill back fill the secret field key during the unmarshalling
// if value of that secret field is not nil.
func (s *PagerDuty) BackfillSecretKeys() {
if s.RoutingKey.Key == "" && s.RoutingKey.Value != nil {
s.RoutingKey.Key = s.ID.String() + routingKeySuffix
}
}
// SecretFields return available secret fields.
func (s PagerDuty) SecretFields() []influxdb.SecretField {
return []influxdb.SecretField{
s.RoutingKey,
}
}
// Valid returns error if some configuration is invalid
func (s PagerDuty) Valid() error {
if err := s.Base.valid(); err != nil {
return err
}
if s.URL == "" {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "pagerduty endpoint URL is empty",
}
}
if _, err := url.Parse(s.URL); err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("pagerduty endpoint URL is invalid: %s", err.Error()),
}
}
if s.RoutingKey.Key != s.ID.String()+routingKeySuffix {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "pagerduty routing key is invalid",
}
}
return nil
}
type pagerdutyAlias PagerDuty
// MarshalJSON implement json.Marshaler interface.
func (s PagerDuty) MarshalJSON() ([]byte, error) {
return json.Marshal(
struct {
pagerdutyAlias
Type string `json:"type"`
}{
pagerdutyAlias: pagerdutyAlias(s),
Type: s.Type(),
})
}
// Type returns the type.
func (s PagerDuty) Type() string {
return PagerDutyType
}

View File

@ -0,0 +1,84 @@
package endpoint
import (
"encoding/json"
"fmt"
"net/url"
"github.com/influxdata/influxdb"
)
var _ influxdb.NotificationEndpoint = &Slack{}
const slackTokenSuffix = "-token"
// Slack is the notification endpoint config of slack.
type Slack struct {
Base
// URL is a valid slack webhook URL
// TODO(jm): validate this in unmarshaler
// example: https://slack.com/api/chat.postMessage
URL string `json:"url"`
// Token is the bearer token for authorization
Token influxdb.SecretField `json:"token"`
}
// BackfillSecretKeys fill back fill the secret field key during the unmarshalling
// if value of that secret field is not nil.
func (s *Slack) BackfillSecretKeys() {
if s.Token.Key == "" && s.Token.Value != nil {
s.Token.Key = s.ID.String() + slackTokenSuffix
}
}
// SecretFields return available secret fields.
func (s Slack) SecretFields() []influxdb.SecretField {
return []influxdb.SecretField{
s.Token,
}
}
// Valid returns error if some configuration is invalid
func (s Slack) Valid() error {
if err := s.Base.valid(); err != nil {
return err
}
if s.URL == "" {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "slack endpoint URL is empty",
}
}
if _, err := url.Parse(s.URL); err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("slack endpoint URL is invalid: %s", err.Error()),
}
}
if s.Token.Key != s.ID.String()+slackTokenSuffix {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "slack endpoint token is invalid",
}
}
return nil
}
type slackAlias Slack
// MarshalJSON implement json.Marshaler interface.
func (s Slack) MarshalJSON() ([]byte, error) {
return json.Marshal(
struct {
slackAlias
Type string `json:"type"`
}{
slackAlias: slackAlias(s),
Type: s.Type(),
})
}
// Type returns the type.
func (s Slack) Type() string {
return SlackType
}

View File

@ -0,0 +1,154 @@
package endpoint
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"github.com/influxdata/influxdb"
)
var _ influxdb.NotificationEndpoint = &WebHook{}
const (
webhookTokenSuffix = "-token"
webhookUsernameSuffix = "-username"
webhookPasswordSuffix = "-password"
)
// WebHook is the notification endpoint config of webhook.
type WebHook struct {
Base
// Path is the API path of WebHook
URL string `json:"url"`
// Token is the bearer token for authorization
Token influxdb.SecretField `json:"token,omitempty"`
Username influxdb.SecretField `json:"username,omitempty"`
Password influxdb.SecretField `json:"password,omitempty"`
AuthMethod string `json:"authmethod"`
Method string `json:"method"`
ContentTemplate string `json:"contentTemplate"`
}
// BackfillSecretKeys fill back fill the secret field key during the unmarshalling
// if value of that secret field is not nil.
func (s *WebHook) BackfillSecretKeys() {
if s.Token.Key == "" && s.Token.Value != nil {
s.Token.Key = s.ID.String() + webhookTokenSuffix
}
if s.Username.Key == "" && s.Username.Value != nil {
s.Username.Key = s.ID.String() + webhookUsernameSuffix
}
if s.Password.Key == "" && s.Password.Value != nil {
s.Password.Key = s.ID.String() + webhookPasswordSuffix
}
}
// SecretFields return available secret fields.
func (s WebHook) SecretFields() []influxdb.SecretField {
arr := make([]influxdb.SecretField, 0)
if s.Token.Key != "" {
arr = append(arr, s.Token)
}
if s.Username.Key != "" {
arr = append(arr, s.Username)
}
if s.Password.Key != "" {
arr = append(arr, s.Password)
}
return arr
}
var goodWebHookAuthMethod = map[string]bool{
"none": true,
"basic": true,
"bearer": true,
}
var goodHTTPMethod = map[string]bool{
http.MethodGet: true,
http.MethodPost: true,
http.MethodPut: true,
}
// Valid returns error if some configuration is invalid
func (s WebHook) Valid() error {
if err := s.Base.valid(); err != nil {
return err
}
if s.URL == "" {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "webhook endpoint URL is empty",
}
}
if _, err := url.Parse(s.URL); err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("webhook endpoint URL is invalid: %s", err.Error()),
}
}
if !goodHTTPMethod[s.Method] {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "invalid webhook http method",
}
}
if !goodWebHookAuthMethod[s.AuthMethod] {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "invalid webhook auth method",
}
}
if s.AuthMethod == "basic" &&
(s.Username.Key != s.ID.String()+webhookUsernameSuffix ||
s.Password.Key != s.ID.String()+webhookPasswordSuffix) {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "invalid webhook username/password for basic auth",
}
}
if s.AuthMethod == "bearer" && s.Token.Key != webhookTokenSuffix {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "invalid webhook token for bearer auth",
}
}
return nil
}
type webhookAlias WebHook
// MarshalJSON implement json.Marshaler interface.
func (s WebHook) MarshalJSON() ([]byte, error) {
return json.Marshal(
struct {
webhookAlias
Type string `json:"type"`
}{
webhookAlias: webhookAlias(s),
Type: s.Type(),
})
}
// Type returns the type.
func (s WebHook) Type() string {
return WebhookType
}
// ParseResponse will parse the http response from webhook.
func (s WebHook) ParseResponse(resp *http.Response) error {
if resp.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return &influxdb.Error{
Msg: string(body),
}
}
return nil
}

123
notification_endpoint.go Normal file
View File

@ -0,0 +1,123 @@
package influxdb
import (
"context"
"encoding/json"
"errors"
)
var (
// ErrInvalidNotificationEndpointType denotes that the provided NotificationEndpoint is not a valid type
ErrInvalidNotificationEndpointType = errors.New("unknown notification endpoint type")
)
// NotificationEndpoint is the configuration describing
// how to call a 3rd party service. E.g. Slack, Pagerduty
type NotificationEndpoint interface {
Valid() error
Type() string
json.Marshaler
Updater
Getter
// SecretFields return available secret fields.
SecretFields() []SecretField
// BackfillSecretKeys fill back fill the secret field key during the unmarshalling
// if value of that secret field is not nil.
BackfillSecretKeys()
}
// ops for checks error
var (
OpFindNotificationEndpointByID = "FindNotificationEndpointByID"
OpFindNotificationEndpoint = "FindNotificationEndpoint"
OpFindNotificationEndpoints = "FindNotificationEndpoints"
OpCreateNotificationEndpoint = "CreateNotificationEndpoint"
OpUpdateNotificationEndpoint = "UpdateNotificationEndpoint"
OpDeleteNotificationEndpoint = "DeleteNotificationEndpoint"
)
// NotificationEndpointFilter represents a set of filter that restrict the returned notification endpoints.
type NotificationEndpointFilter struct {
ID *ID
OrgID *ID
Org *string
}
// QueryParams Converts NotificationEndpointFilter fields to url query params.
func (f NotificationEndpointFilter) QueryParams() map[string][]string {
qp := map[string][]string{}
if f.OrgID != nil {
qp["orgID"] = []string{f.OrgID.String()}
}
if f.Org != nil {
qp["org"] = []string{*f.Org}
}
return qp
}
// NotificationEndpointUpdate is the set of upgrade fields for patch request.
type NotificationEndpointUpdate struct {
Name *string `json:"name,omitempty"`
Description *string `json:"description,omitempty"`
Status *Status `json:"status,omitempty"`
}
// Valid will verify if the NotificationEndpointUpdate is valid.
func (n *NotificationEndpointUpdate) Valid() error {
if n.Name != nil && *n.Name == "" {
return &Error{
Code: EInvalid,
Msg: "Notification Endpoint Name can't be empty",
}
}
if n.Description != nil && *n.Description == "" {
return &Error{
Code: EInvalid,
Msg: "Notification Endpoint Description can't be empty",
}
}
if n.Status != nil {
if err := n.Status.Valid(); err != nil {
return err
}
}
return nil
}
// NotificationEndpointService represents a service for managing notification endpoints.
type NotificationEndpointService interface {
// UserResourceMappingService must be part of all NotificationEndpointStore service,
// for create, delete.
UserResourceMappingService
// OrganizationService is needed for search filter
OrganizationService
// SecretService is needed to check if the secret key exists.
SecretService
// FindNotificationEndpointByID returns a single notification endpoint by ID.
FindNotificationEndpointByID(ctx context.Context, id ID) (NotificationEndpoint, error)
// FindNotificationEndpoints returns a list of notification endpoints that match filter and the total count of matching notification endpoints.
// Additional options provide pagination & sorting.
FindNotificationEndpoints(ctx context.Context, filter NotificationEndpointFilter, opt ...FindOptions) ([]NotificationEndpoint, int, error)
// CreateNotificationEndpoint creates a new notification endpoint and sets b.ID with the new identifier.
CreateNotificationEndpoint(ctx context.Context, ne NotificationEndpoint, userID ID) error
// UpdateNotificationEndpointUpdateNotificationEndpoint updates a single notification endpoint.
// Returns the new notification endpoint after update.
UpdateNotificationEndpoint(ctx context.Context, id ID, nr NotificationEndpoint, userID ID) (NotificationEndpoint, error)
// PatchNotificationEndpoint updates a single notification endpoint with changeset.
// Returns the new notification endpoint state after update.
PatchNotificationEndpoint(ctx context.Context, id ID, upd NotificationEndpointUpdate) (NotificationEndpoint, error)
// DeleteNotificationEndpoint removes a notification endpoint by ID.
DeleteNotificationEndpoint(ctx context.Context, id ID) error
}

View File

@ -1,6 +1,10 @@
package influxdb package influxdb
import "context" import (
"context"
"encoding/json"
"strings"
)
// ErrSecretNotFound is the error msg for a missing secret. // ErrSecretNotFound is the error msg for a missing secret.
const ErrSecretNotFound = "secret not found" const ErrSecretNotFound = "secret not found"
@ -25,3 +29,46 @@ type SecretService interface {
// DeleteSecret removes a single secret from the secret store. // DeleteSecret removes a single secret from the secret store.
DeleteSecret(ctx context.Context, orgID ID, ks ...string) error DeleteSecret(ctx context.Context, orgID ID, ks ...string) error
} }
// SecretField contains a key string, and value pointer.
type SecretField struct {
Key string `json:"key"`
Value *string `json:"value,omitempty"`
}
// String returns the key of the secret.
func (s SecretField) String() string {
if s.Key == "" {
return ""
}
return "secret: " + string(s.Key)
}
// MarshalJSON implement the json marshaler interface.
func (s SecretField) MarshalJSON() ([]byte, error) {
return json.Marshal(s.String())
}
// UnmarshalJSON implement the json unmarshaler interface.
func (s *SecretField) UnmarshalJSON(b []byte) error {
var ss string
if err := json.Unmarshal(b, &ss); err != nil {
return err
}
if ss == "" {
s.Key = ""
return nil
}
if strings.HasPrefix(ss, "secret: ") {
s.Key = ss[len("secret: "):]
} else {
s.Value = strPtr(ss)
}
return nil
}
func strPtr(s string) *string {
ss := new(string)
*ss = s
return ss
}

61
secret_test.go Normal file
View File

@ -0,0 +1,61 @@
package influxdb
import (
"encoding/json"
"testing"
"github.com/google/go-cmp/cmp"
)
func TestSecretFieldJSON(t *testing.T) {
cases := []struct {
name string
fld *SecretField
json string
target SecretField
}{
{
name: "regular",
fld: &SecretField{Key: "some key"},
json: `"secret: some key"`,
target: SecretField{Key: "some key"},
},
{name: "blank", fld: &SecretField{}, json: `""`},
{
name: "with value",
fld: &SecretField{
Key: "some key",
Value: strPtr("some value"),
},
json: `"secret: some key"`,
target: SecretField{
Key: "some key",
},
},
{
name: "unmarshal a post",
json: `"some value"`,
target: SecretField{
Value: strPtr("some value"),
},
},
}
for _, c := range cases {
if c.fld != nil {
serialized, err := json.Marshal(c.fld)
if err != nil {
t.Fatalf("%s failed, secret key marshal err: %q", c.name, err.Error())
}
if string(serialized) != c.json {
t.Fatalf("%s failed, secret key marshal result is unexpected, got %q, want %q", c.name, string(serialized), c.json)
}
}
var deserialized SecretField
if err := json.Unmarshal([]byte(c.json), &deserialized); err != nil {
t.Fatalf("%s failed, secret key unmarshal err: %q", c.name, err.Error())
}
if diff := cmp.Diff(deserialized, c.target); diff != "" {
t.Fatalf("%s failed, secret key unmarshal result is unexpected, diff %s", c.name, diff)
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -22,10 +22,6 @@ type NotificationRuleFields struct {
UserResourceMappings []*influxdb.UserResourceMapping UserResourceMappings []*influxdb.UserResourceMapping
} }
var timeGen1 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 13, 4, 19, 10, 0, time.UTC)}
var timeGen2 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 14, 5, 23, 53, 10, time.UTC)}
var time3 = time.Date(2006, time.July, 15, 5, 23, 53, 10, time.UTC)
var notificationRuleCmpOptions = cmp.Options{ var notificationRuleCmpOptions = cmp.Options{
cmp.Transformer("Sort", func(in []influxdb.NotificationRule) []influxdb.NotificationRule { cmp.Transformer("Sort", func(in []influxdb.NotificationRule) []influxdb.NotificationRule {
out := append([]influxdb.NotificationRule(nil), in...) out := append([]influxdb.NotificationRule(nil), in...)