feat(kv): add notification endpoint
Co-authored-by: Jade McGough <jade@thezets.com>pull/14639/head
parent
f683f0dc46
commit
61628671ed
|
@ -0,0 +1,544 @@
|
|||
package kv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/influxdata/influxdb/kit/tracing"
|
||||
"github.com/influxdata/influxdb/notification/endpoint"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
)
|
||||
|
||||
var (
|
||||
notificationEndpointBucket = []byte("notificationEndpointv1")
|
||||
notificationEndpointIndex = []byte("notificationEndpointIndexv1")
|
||||
|
||||
// ErrNotificationEndpointNotFound is used when the notification endpoint is not found.
|
||||
ErrNotificationEndpointNotFound = &influxdb.Error{
|
||||
Msg: "notification endpoint not found",
|
||||
Code: influxdb.ENotFound,
|
||||
}
|
||||
|
||||
// ErrInvalidNotificationEndpointID is used when the service was provided
|
||||
// an invalid ID format.
|
||||
ErrInvalidNotificationEndpointID = &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: "provided notification endpoint ID has invalid format",
|
||||
}
|
||||
)
|
||||
|
||||
var _ influxdb.NotificationEndpointService = (*Service)(nil)
|
||||
|
||||
func (s *Service) initializeNotificationEndpoint(ctx context.Context, tx Tx) error {
|
||||
if _, err := s.notificationEndpointBucket(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := s.notificationEndpointIndexBucket(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnavailableNotificationEndpointStoreError is used if we aren't able to interact with the
|
||||
// store, it means the store is not available at the moment (e.g. network).
|
||||
func UnavailableNotificationEndpointStoreError(err error) *influxdb.Error {
|
||||
return &influxdb.Error{
|
||||
Code: influxdb.EInternal,
|
||||
Msg: fmt.Sprintf("Unable to connect to notification endpoint store service. Please try again; Err: %v", err),
|
||||
Op: "kv/notificationEndpoint",
|
||||
}
|
||||
}
|
||||
|
||||
// UnavailableNotificationEndpointIndexError is used when the error comes from an internal system.
|
||||
func UnavailableNotificationEndpointIndexError(err error) *influxdb.Error {
|
||||
return &influxdb.Error{
|
||||
Code: influxdb.EInternal,
|
||||
Msg: fmt.Sprintf("unexpected error retrieving notification endpoint's index bucket; Err %v", err),
|
||||
Op: "kv/notificationEndpointIndex",
|
||||
}
|
||||
}
|
||||
|
||||
// InternalNotificationEndpointStoreError is used when the error comes from an
|
||||
// internal system.
|
||||
func InternalNotificationEndpointStoreError(err error) *influxdb.Error {
|
||||
return &influxdb.Error{
|
||||
Code: influxdb.EInternal,
|
||||
Msg: fmt.Sprintf("Unknown internal notification endpoint data error; Err: %v", err),
|
||||
Op: "kv/notificationEndpoint",
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) notificationEndpointBucket(tx Tx) (Bucket, error) {
|
||||
b, err := tx.Bucket(notificationEndpointBucket)
|
||||
if err != nil {
|
||||
return nil, UnavailableNotificationEndpointStoreError(err)
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
func (s *Service) notificationEndpointIndexBucket(tx Tx) (Bucket, error) {
|
||||
b, err := tx.Bucket(notificationEndpointIndex)
|
||||
if err != nil {
|
||||
return nil, UnavailableNotificationEndpointIndexError(err)
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// CreateNotificationEndpoint creates a new notification endpoint and sets b.ID with the new identifier.
|
||||
func (s *Service) CreateNotificationEndpoint(ctx context.Context, edp influxdb.NotificationEndpoint, userID influxdb.ID) error {
|
||||
return s.kv.Update(ctx, func(tx Tx) error {
|
||||
return s.createNotificationEndpoint(ctx, tx, edp, userID)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Service) createNotificationEndpoint(ctx context.Context, tx Tx, edp influxdb.NotificationEndpoint, userID influxdb.ID) error {
|
||||
if edp.GetOrgID().Valid() {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
_, pe := s.findOrganizationByID(ctx, tx, edp.GetOrgID())
|
||||
if pe != nil {
|
||||
return &influxdb.Error{
|
||||
Op: influxdb.OpCreateCheck,
|
||||
Err: pe,
|
||||
}
|
||||
}
|
||||
}
|
||||
// notification endpoint name unique
|
||||
if _, err := s.findNotificationEndpointByName(ctx, tx, edp.GetOrgID(), edp.GetName()); err == nil {
|
||||
if err == nil {
|
||||
return &influxdb.Error{
|
||||
Code: influxdb.EConflict,
|
||||
Msg: fmt.Sprintf("notification endpoint with name %s already exists", edp.GetName()),
|
||||
}
|
||||
}
|
||||
}
|
||||
id := s.IDGenerator.ID()
|
||||
edp.SetID(id)
|
||||
now := s.TimeGenerator.Now()
|
||||
edp.SetCreatedAt(now)
|
||||
edp.SetUpdatedAt(now)
|
||||
edp.BackfillSecretKeys()
|
||||
|
||||
for _, fld := range edp.SecretFields() {
|
||||
if fld.Value != nil {
|
||||
if err := s.putSecret(ctx, tx, edp.GetOrgID(),
|
||||
fld.Key, *fld.Value); err != nil {
|
||||
return InternalNotificationEndpointStoreError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.putNotificationEndpoint(ctx, tx, edp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
urm := &influxdb.UserResourceMapping{
|
||||
ResourceID: id,
|
||||
UserID: userID,
|
||||
UserType: influxdb.Owner,
|
||||
ResourceType: influxdb.NotificationEndpointResourceType,
|
||||
}
|
||||
return s.createUserResourceMapping(ctx, tx, urm)
|
||||
}
|
||||
|
||||
func (s *Service) findNotificationEndpointByName(ctx context.Context, tx Tx, orgID influxdb.ID, n string) (influxdb.NotificationEndpoint, error) {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
key, err := notificationEndpointIndexKey(orgID, n)
|
||||
if err != nil {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
idx, err := s.notificationEndpointIndexBucket(tx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
buf, err := idx.Get(key)
|
||||
if IsNotFound(err) {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.ENotFound,
|
||||
Msg: fmt.Sprintf("notification endpoint %q not found", n),
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var id influxdb.ID
|
||||
if err := id.Decode(buf); err != nil {
|
||||
return nil, &influxdb.Error{
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
edp, _, _, err := s.findNotificationEndpointByID(ctx, tx, id)
|
||||
return edp, err
|
||||
}
|
||||
|
||||
// UpdateNotificationEndpoint updates a single notification endpoint.
|
||||
// Returns the new notification endpoint after update.
|
||||
func (s *Service) UpdateNotificationEndpoint(ctx context.Context, id influxdb.ID, edp influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
|
||||
var err error
|
||||
err = s.kv.Update(ctx, func(tx Tx) error {
|
||||
edp, err = s.updateNotificationEndpoint(ctx, tx, id, edp, userID)
|
||||
return err
|
||||
})
|
||||
return edp, err
|
||||
}
|
||||
|
||||
func (s *Service) updateNotificationEndpoint(ctx context.Context, tx Tx, id influxdb.ID, edp influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
|
||||
current, _, _, err := s.findNotificationEndpointByID(ctx, tx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if edp.GetName() != current.GetName() {
|
||||
edp0, err := s.findNotificationEndpointByName(ctx, tx, current.GetOrgID(), edp.GetName())
|
||||
if err == nil && edp0.GetID() != id {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EConflict,
|
||||
Msg: "notification endpoint name is not unique",
|
||||
}
|
||||
}
|
||||
key, err := notificationEndpointIndexKey(current.GetOrgID(), current.GetName())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
idx, err := s.notificationEndpointIndexBucket(tx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := idx.Delete(key); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// ID and OrganizationID can not be updated
|
||||
edp.SetID(current.GetID())
|
||||
edp.SetOrgID(current.GetOrgID())
|
||||
edp.SetCreatedAt(current.GetCRUDLog().CreatedAt)
|
||||
edp.SetUpdatedAt(s.TimeGenerator.Now())
|
||||
|
||||
edp.BackfillSecretKeys()
|
||||
for _, fld := range edp.SecretFields() {
|
||||
if fld.Value != nil {
|
||||
if err = s.putSecret(ctx, tx, edp.GetOrgID(),
|
||||
fld.Key, *fld.Value); err != nil {
|
||||
return nil, InternalNotificationEndpointStoreError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
err = s.putNotificationEndpoint(ctx, tx, edp)
|
||||
return edp, err
|
||||
}
|
||||
|
||||
// PatchNotificationEndpoint updates a single notification endpoint with changeset.
|
||||
// Returns the new notification endpoint state after update.
|
||||
func (s *Service) PatchNotificationEndpoint(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error) {
|
||||
var edp influxdb.NotificationEndpoint
|
||||
if err := s.kv.Update(ctx, func(tx Tx) (err error) {
|
||||
edp, err = s.patchNotificationEndpoint(ctx, tx, id, upd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return edp, nil
|
||||
}
|
||||
|
||||
func (s *Service) patchNotificationEndpoint(ctx context.Context, tx Tx, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error) {
|
||||
edp, _, _, err := s.findNotificationEndpointByID(ctx, tx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if upd.Name != nil {
|
||||
edp0, err := s.findNotificationEndpointByName(ctx, tx, edp.GetOrgID(), *upd.Name)
|
||||
if err == nil && edp0.GetID() != id {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EConflict,
|
||||
Msg: "notification endpoint name is not unique",
|
||||
}
|
||||
}
|
||||
key, err := notificationEndpointIndexKey(edp.GetOrgID(), edp.GetName())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
idx, err := s.notificationEndpointIndexBucket(tx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := idx.Delete(key); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if upd.Name != nil {
|
||||
edp.SetName(*upd.Name)
|
||||
}
|
||||
if upd.Description != nil {
|
||||
edp.SetDescription(*upd.Description)
|
||||
}
|
||||
if upd.Status != nil {
|
||||
edp.SetStatus(*upd.Status)
|
||||
}
|
||||
edp.SetUpdatedAt(s.TimeGenerator.Now())
|
||||
err = s.putNotificationEndpoint(ctx, tx, edp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return edp, nil
|
||||
}
|
||||
|
||||
// PutNotificationEndpoint put a notification endpoint to storage.
|
||||
func (s *Service) PutNotificationEndpoint(ctx context.Context, edp influxdb.NotificationEndpoint) error {
|
||||
return s.kv.Update(ctx, func(tx Tx) (err error) {
|
||||
return s.putNotificationEndpoint(ctx, tx, edp)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Service) putNotificationEndpoint(ctx context.Context, tx Tx, edp influxdb.NotificationEndpoint) error {
|
||||
if err := edp.Valid(); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, k := range edp.SecretFields() {
|
||||
if _, err := s.loadSecret(ctx, tx, edp.GetOrgID(), string(k.Key)); err != nil {
|
||||
return &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: "Unable to locate secret key: " + string(k.Key),
|
||||
}
|
||||
}
|
||||
}
|
||||
encodedID, _ := edp.GetID().Encode()
|
||||
|
||||
v, err := json.Marshal(edp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
key, pe := notificationEndpointIndexKey(edp.GetOrgID(), edp.GetName())
|
||||
if err != nil {
|
||||
return pe
|
||||
}
|
||||
|
||||
idx, err := s.notificationEndpointIndexBucket(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := idx.Put(key, encodedID); err != nil {
|
||||
return &influxdb.Error{
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
bucket, err := s.notificationEndpointBucket(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := bucket.Put(encodedID, v); err != nil {
|
||||
return UnavailableNotificationEndpointStoreError(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// notificationEndpointIndexKey is a combination of the orgID and the notification endpoint name.
|
||||
func notificationEndpointIndexKey(orgID influxdb.ID, name string) ([]byte, error) {
|
||||
orgIDEncoded, err := orgID.Encode()
|
||||
if err != nil {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
k := make([]byte, influxdb.IDLength+len(name))
|
||||
copy(k, orgIDEncoded)
|
||||
copy(k[influxdb.IDLength:], []byte(name))
|
||||
return k, nil
|
||||
}
|
||||
|
||||
// FindNotificationEndpointByID returns a single notification endpoint by ID.
|
||||
func (s *Service) FindNotificationEndpointByID(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
|
||||
var (
|
||||
edp influxdb.NotificationEndpoint
|
||||
err error
|
||||
)
|
||||
|
||||
err = s.kv.View(ctx, func(tx Tx) error {
|
||||
edp, _, _, err = s.findNotificationEndpointByID(ctx, tx, id)
|
||||
return err
|
||||
})
|
||||
|
||||
return edp, err
|
||||
}
|
||||
|
||||
func (s *Service) findNotificationEndpointByID(ctx context.Context, tx Tx,
|
||||
id influxdb.ID) (edp influxdb.NotificationEndpoint, encID []byte, bucket Bucket, err error) {
|
||||
encID, err = id.Encode()
|
||||
if err != nil {
|
||||
return nil, encID, bucket, ErrInvalidNotificationEndpointID
|
||||
}
|
||||
|
||||
bucket, err = s.notificationEndpointBucket(tx)
|
||||
if err != nil {
|
||||
return nil, encID, bucket, err
|
||||
}
|
||||
|
||||
v, err := bucket.Get(encID)
|
||||
if IsNotFound(err) {
|
||||
return nil, encID, bucket, ErrNotificationEndpointNotFound
|
||||
}
|
||||
if err != nil {
|
||||
return nil, encID, bucket, InternalNotificationEndpointStoreError(err)
|
||||
}
|
||||
|
||||
edp, err = endpoint.UnmarshalJSON(v)
|
||||
return edp, encID, bucket, err
|
||||
}
|
||||
|
||||
// FindNotificationEndpoints returns a list of notification endpoints that match filter and the total count of matching notification endpoints.
|
||||
// Additional options provide pagination & sorting.
|
||||
func (s *Service) FindNotificationEndpoints(ctx context.Context, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) (edps []influxdb.NotificationEndpoint, n int, err error) {
|
||||
err = s.kv.View(ctx, func(tx Tx) error {
|
||||
edps, n, err = s.findNotificationEndpoints(ctx, tx, filter)
|
||||
return err
|
||||
})
|
||||
return edps, n, err
|
||||
}
|
||||
|
||||
func (s *Service) findNotificationEndpoints(ctx context.Context, tx Tx, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error) {
|
||||
edps := make([]influxdb.NotificationEndpoint, 0)
|
||||
|
||||
if filter.Org != nil {
|
||||
o, err := s.findOrganizationByName(ctx, tx, *filter.Org)
|
||||
if err != nil {
|
||||
return nil, 0, &influxdb.Error{
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
filter.OrgID = &o.ID
|
||||
}
|
||||
|
||||
var offset, limit, count int
|
||||
var descending bool
|
||||
if len(opt) > 0 {
|
||||
offset = opt[0].Offset
|
||||
limit = opt[0].Limit
|
||||
descending = opt[0].Descending
|
||||
}
|
||||
filterFn := filterNotificationEndpointsFn(filter)
|
||||
err := s.forEachNotificationEndpoint(ctx, tx, descending, func(edp influxdb.NotificationEndpoint) bool {
|
||||
if filterFn(edp) {
|
||||
if count >= offset {
|
||||
edps = append(edps, edp)
|
||||
}
|
||||
count++
|
||||
}
|
||||
|
||||
if limit > 0 && len(edps) >= limit {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
return edps, len(edps), err
|
||||
}
|
||||
|
||||
// forEachNotificationEndpoint will iterate through all notification endpoints while fn returns true.
|
||||
func (s *Service) forEachNotificationEndpoint(ctx context.Context, tx Tx, descending bool, fn func(influxdb.NotificationEndpoint) bool) error {
|
||||
|
||||
bkt, err := s.notificationEndpointBucket(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cur, err := bkt.Cursor()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var k, v []byte
|
||||
if descending {
|
||||
k, v = cur.Last()
|
||||
} else {
|
||||
k, v = cur.First()
|
||||
}
|
||||
|
||||
for k != nil {
|
||||
edp, err := endpoint.UnmarshalJSON(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !fn(edp) {
|
||||
break
|
||||
}
|
||||
|
||||
if descending {
|
||||
k, v = cur.Prev()
|
||||
} else {
|
||||
k, v = cur.Next()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func filterNotificationEndpointsFn(filter influxdb.NotificationEndpointFilter) func(edp influxdb.NotificationEndpoint) bool {
|
||||
return func(edp influxdb.NotificationEndpoint) bool {
|
||||
if filter.ID != nil {
|
||||
if edp.GetID() != *filter.ID {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if filter.OrgID != nil {
|
||||
if edp.GetOrgID() != *filter.OrgID {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteNotificationEndpoint removes a notification endpoint by ID.
|
||||
func (s *Service) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) error {
|
||||
return s.kv.Update(ctx, func(tx Tx) error {
|
||||
return s.deleteNotificationEndpoint(ctx, tx, id)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Service) deleteNotificationEndpoint(ctx context.Context, tx Tx, id influxdb.ID) error {
|
||||
edp, encID, bucket, err := s.findNotificationEndpointByID(ctx, tx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = bucket.Delete(encID); err != nil {
|
||||
return InternalNotificationEndpointStoreError(err)
|
||||
}
|
||||
|
||||
for _, fld := range edp.SecretFields() {
|
||||
if err := s.deleteSecret(ctx, tx, edp.GetOrgID(), fld.Key); err != nil {
|
||||
InternalNotificationEndpointStoreError(err)
|
||||
}
|
||||
}
|
||||
|
||||
return s.deleteUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{
|
||||
ResourceID: id,
|
||||
ResourceType: influxdb.NotificationEndpointResourceType,
|
||||
})
|
||||
}
|
|
@ -0,0 +1,111 @@
|
|||
package kv_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/kv"
|
||||
influxdbtesting "github.com/influxdata/influxdb/testing"
|
||||
)
|
||||
|
||||
func TestBoltNotificationEndpointService(t *testing.T) {
|
||||
influxdbtesting.NotificationEndpointService(initBoltNotificationEndpointService, t)
|
||||
}
|
||||
|
||||
func TestNotificationEndpointService(t *testing.T) {
|
||||
influxdbtesting.NotificationEndpointService(initInmemNotificationEndpointService, t)
|
||||
}
|
||||
|
||||
func initBoltNotificationEndpointService(f influxdbtesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, func()) {
|
||||
s, closeBolt, err := NewTestBoltStore()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new kv store: %v", err)
|
||||
}
|
||||
|
||||
svc, closeSvc := initNotificationEndpointService(s, f, t)
|
||||
return svc, func() {
|
||||
closeSvc()
|
||||
closeBolt()
|
||||
}
|
||||
}
|
||||
|
||||
func initInmemNotificationEndpointService(f influxdbtesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, func()) {
|
||||
s, closeInmem, err := NewTestInmemStore()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new kv store: %v", err)
|
||||
}
|
||||
|
||||
svc, closeSvc := initNotificationEndpointService(s, f, t)
|
||||
return svc, func() {
|
||||
closeSvc()
|
||||
closeInmem()
|
||||
}
|
||||
}
|
||||
|
||||
func initNotificationEndpointService(s kv.Store, f influxdbtesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, func()) {
|
||||
svc := kv.NewService(s)
|
||||
svc.IDGenerator = f.IDGenerator
|
||||
svc.TimeGenerator = f.TimeGenerator
|
||||
if f.TimeGenerator == nil {
|
||||
svc.TimeGenerator = influxdb.RealTimeGenerator{}
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
if err := svc.Initialize(ctx); err != nil {
|
||||
t.Fatalf("error initializing user service: %v", err)
|
||||
}
|
||||
|
||||
for _, s := range f.Secrets {
|
||||
for k, v := range s.Env {
|
||||
if err := svc.PutSecret(ctx, s.OrganizationID, k, v); err != nil {
|
||||
t.Fatalf("failed to populate secrets")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, edp := range f.NotificationEndpoints {
|
||||
if err := svc.PutNotificationEndpoint(ctx, edp); err != nil {
|
||||
t.Fatalf("failed to populate notification endpoint: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, o := range f.Orgs {
|
||||
if err := svc.PutOrganization(ctx, o); err != nil {
|
||||
t.Fatalf("failed to populate org: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, m := range f.UserResourceMappings {
|
||||
if err := svc.CreateUserResourceMapping(ctx, m); err != nil {
|
||||
t.Fatalf("failed to populate user resource mapping: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return svc, func() {
|
||||
for _, edp := range f.NotificationEndpoints {
|
||||
if err := svc.DeleteNotificationEndpoint(ctx, edp.GetID()); err != nil {
|
||||
t.Logf("failed to remove notification endpoint: %v", err)
|
||||
}
|
||||
}
|
||||
for _, o := range f.Orgs {
|
||||
if err := svc.DeleteOrganization(ctx, o.ID); err != nil {
|
||||
t.Fatalf("failed to remove org: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, urm := range f.UserResourceMappings {
|
||||
if err := svc.DeleteUserResourceMapping(ctx, urm.ResourceID, urm.UserID); err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
|
||||
t.Logf("failed to remove urm rule: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, s := range f.Secrets {
|
||||
for k, v := range s.Env {
|
||||
if err := svc.DeleteSecret(ctx, s.OrganizationID, k, v); err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
|
||||
t.Fatalf("failed to populate secrets")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -134,6 +134,10 @@ func (s *Service) Initialize(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := s.initializeNotificationEndpoint(ctx, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.initializeUsers(ctx, tx)
|
||||
})
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -22,10 +22,6 @@ type NotificationRuleFields struct {
|
|||
UserResourceMappings []*influxdb.UserResourceMapping
|
||||
}
|
||||
|
||||
var timeGen1 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 13, 4, 19, 10, 0, time.UTC)}
|
||||
var timeGen2 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 14, 5, 23, 53, 10, time.UTC)}
|
||||
var time3 = time.Date(2006, time.July, 15, 5, 23, 53, 10, time.UTC)
|
||||
|
||||
var notificationRuleCmpOptions = cmp.Options{
|
||||
cmp.Transformer("Sort", func(in []influxdb.NotificationRule) []influxdb.NotificationRule {
|
||||
out := append([]influxdb.NotificationRule(nil), in...)
|
||||
|
|
Loading…
Reference in New Issue