refactor(notifications): isolate endpoint service (#19818)

* refactor(notifications): isolate endpoint service

Following the ongoing effort to isolate behaviours into their own
packages and off of kv.Service, this change move the notification
endpoints service implementation into its own package. It removes the
endpoint behaviors from the kv service completely.

* chore(influxd): wire up the isolated check service in place of kv service
pull/19846/head
George 2020-10-28 15:22:14 +00:00 committed by GitHub
parent 9dd37a1d1b
commit 78cafa861b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 965 additions and 3876 deletions

View File

@ -24,7 +24,6 @@ import (
"github.com/influxdata/influxdb/v2/chronograf/server"
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect"
"github.com/influxdata/influxdb/v2/dbrp"
"github.com/influxdata/influxdb/v2/endpoints"
"github.com/influxdata/influxdb/v2/gather"
"github.com/influxdata/influxdb/v2/http"
iqlcontrol "github.com/influxdata/influxdb/v2/influxql/control"
@ -46,6 +45,7 @@ import (
"github.com/influxdata/influxdb/v2/label"
influxlogger "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/nats"
endpointservice "github.com/influxdata/influxdb/v2/notification/endpoint/service"
ruleservice "github.com/influxdata/influxdb/v2/notification/rule/service"
"github.com/influxdata/influxdb/v2/pkger"
infprom "github.com/influxdata/influxdb/v2/prometheus"
@ -760,17 +760,16 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.reg.MustRegister(m.boltClient)
var (
variableSvc platform.VariableService = m.kvService
sourceSvc platform.SourceService = m.kvService
dashboardSvc platform.DashboardService = m.kvService
dashboardLogSvc platform.DashboardOperationLogService = m.kvService
userLogSvc platform.UserOperationLogService = m.kvService
bucketLogSvc platform.BucketOperationLogService = m.kvService
orgLogSvc platform.OrganizationOperationLogService = m.kvService
scraperTargetSvc platform.ScraperTargetStoreService = m.kvService
telegrafSvc platform.TelegrafConfigStore = m.kvService
lookupSvc platform.LookupService = m.kvService
notificationEndpointStore platform.NotificationEndpointService = m.kvService
variableSvc platform.VariableService = m.kvService
sourceSvc platform.SourceService = m.kvService
dashboardSvc platform.DashboardService = m.kvService
dashboardLogSvc platform.DashboardOperationLogService = m.kvService
userLogSvc platform.UserOperationLogService = m.kvService
bucketLogSvc platform.BucketOperationLogService = m.kvService
orgLogSvc platform.OrganizationOperationLogService = m.kvService
scraperTargetSvc platform.ScraperTargetStoreService = m.kvService
telegrafSvc platform.TelegrafConfigStore = m.kvService
lookupSvc platform.LookupService = m.kvService
)
tenantStore := tenant.NewStore(m.kvStore)
@ -989,10 +988,15 @@ func (m *Launcher) run(ctx context.Context) (err error) {
checkSvc = middleware.NewCheckService(checkSvc, m.kvService, coordinator)
}
var notificationEndpointSvc platform.NotificationEndpointService
{
notificationEndpointSvc = endpointservice.New(endpointservice.NewStore(m.kvStore), secretSvc)
}
var notificationRuleSvc platform.NotificationRuleStore
{
coordinator := coordinator.NewCoordinator(m.log, m.scheduler, m.executor)
notificationRuleSvc, err = ruleservice.NewRuleService(m.log, m.kvStore, m.kvService, ts.OrganizationService, m.kvService)
notificationRuleSvc, err = ruleservice.New(m.log, m.kvStore, m.kvService, ts.OrganizationService, notificationEndpointSvc)
if err != nil {
return err
}
@ -1146,7 +1150,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
VariableFinder: variableSvc,
TargetFinder: scraperTargetSvc,
CheckFinder: checkSvc,
NotificationEndpointFinder: notificationEndpointStore,
NotificationEndpointFinder: notificationEndpointSvc,
NotificationRuleFinder: notificationRuleSvc,
}
@ -1191,7 +1195,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
TaskService: taskSvc,
TelegrafService: telegrafSvc,
NotificationRuleStore: notificationRuleSvc,
NotificationEndpointService: endpoints.NewService(notificationEndpointStore, secretSvc, ts.UserResourceMappingService, ts.OrganizationService),
NotificationEndpointService: notificationEndpointSvc,
CheckService: checkSvc,
ScraperTargetStoreService: scraperTargetSvc,
ChronografService: chronografSvc,
@ -1529,6 +1533,11 @@ func (m *Launcher) TaskControlService() taskbackend.TaskControlService {
return m.taskControlService
}
// CheckService returns the internal check service.
func (m *Launcher) CheckService() platform.CheckService {
return m.apibackend.CheckService
}
// KeyValueService returns the internal key-value service.
func (m *Launcher) KeyValueService() *kv.Service {
return m.kvService

View File

@ -367,10 +367,6 @@ func (tl *TestLauncher) BucketService(tb testing.TB) *http.BucketService {
return &http.BucketService{Client: tl.HTTPClient(tb)}
}
func (tl *TestLauncher) CheckService() influxdb.CheckService {
return tl.kvService
}
func (tl *TestLauncher) DashboardService(tb testing.TB) *http.DashboardService {
tb.Helper()
return &http.DashboardService{Client: tl.HTTPClient(tb)}

View File

@ -26,7 +26,6 @@ type NotificationEndpointBackend struct {
UserResourceMappingService influxdb.UserResourceMappingService
LabelService influxdb.LabelService
UserService influxdb.UserService
OrganizationService influxdb.OrganizationService
}
// NewNotificationEndpointBackend returns a new instance of NotificationEndpointBackend.
@ -38,7 +37,6 @@ func NewNotificationEndpointBackend(log *zap.Logger, b *APIBackend) *Notificatio
UserResourceMappingService: b.UserResourceMappingService,
LabelService: b.LabelService,
UserService: b.UserService,
OrganizationService: b.OrganizationService,
}
}
@ -56,7 +54,6 @@ type NotificationEndpointHandler struct {
UserResourceMappingService influxdb.UserResourceMappingService
LabelService influxdb.LabelService
UserService influxdb.UserService
OrganizationService influxdb.OrganizationService
}
const (
@ -81,7 +78,6 @@ func NewNotificationEndpointHandler(log *zap.Logger, b *NotificationEndpointBack
UserResourceMappingService: b.UserResourceMappingService,
LabelService: b.LabelService,
UserService: b.UserService,
OrganizationService: b.OrganizationService,
}
h.HandlerFunc("POST", prefixNotificationEndpoints, h.handlePostNotificationEndpoint)
h.HandlerFunc("GET", prefixNotificationEndpoints, h.handleGetNotificationEndpoints)
@ -551,20 +547,12 @@ func (h *NotificationEndpointHandler) handleDeleteNotificationEndpoint(w http.Re
// NotificationEndpointService is an http client for the influxdb.NotificationEndpointService server implementation.
type NotificationEndpointService struct {
Client *httpc.Client
*UserResourceMappingService
*OrganizationService
}
// NewNotificationEndpointService constructs a new http NotificationEndpointService.
func NewNotificationEndpointService(client *httpc.Client) *NotificationEndpointService {
return &NotificationEndpointService{
Client: client,
UserResourceMappingService: &UserResourceMappingService{
Client: client,
},
OrganizationService: &OrganizationService{
Client: client,
},
}
}

View File

@ -18,7 +18,10 @@ import (
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/notification/endpoint"
"github.com/influxdata/influxdb/v2/notification/endpoint/service"
endpointTesting "github.com/influxdata/influxdb/v2/notification/endpoint/service/testing"
"github.com/influxdata/influxdb/v2/pkg/testttp"
"github.com/influxdata/influxdb/v2/tenant"
influxTesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
@ -32,7 +35,6 @@ func NewMockNotificationEndpointBackend(t *testing.T) *NotificationEndpointBacke
UserResourceMappingService: mock.NewUserResourceMappingService(),
LabelService: mock.NewLabelService(),
UserService: mock.NewUserService(),
OrganizationService: mock.NewOrganizationService(),
}
}
@ -489,7 +491,6 @@ func TestService_handlePostNotificationEndpoint(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
notificationEndpointBackend := NewMockNotificationEndpointBackend(t)
notificationEndpointBackend.NotificationEndpointService = tt.fields.NotificationEndpointService
notificationEndpointBackend.OrganizationService = tt.fields.OrganizationService
testttp.
PostJSON(t, prefixNotificationEndpoints, tt.args.endpoint).
@ -1060,37 +1061,40 @@ func TestService_handlePostNotificationEndpointOwner(t *testing.T) {
}
}
func initNotificationEndpointService(f influxTesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()) {
func initNotificationEndpointService(f endpointTesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()) {
ctx := context.Background()
store := NewTestInmemStore(t)
logger := zaptest.NewLogger(t)
svc := kv.NewService(logger, store)
svc.IDGenerator = f.IDGenerator
svc.TimeGenerator = f.TimeGenerator
kvSvc := kv.NewService(logger, store)
kvSvc.IDGenerator = f.IDGenerator
kvSvc.TimeGenerator = f.TimeGenerator
for _, v := range f.Orgs {
if err := svc.PutOrganization(ctx, v); err != nil {
t.Fatalf("failed to replace org: %v", err)
}
}
endpointStore := service.NewStore(store)
endpointStore.IDGenerator = f.IDGenerator
endpointStore.TimeGenerator = f.TimeGenerator
endpointService := service.New(endpointStore, kvSvc)
for _, m := range f.UserResourceMappings {
if err := svc.CreateUserResourceMapping(ctx, m); err != nil {
t.Fatalf("failed to populate user resource mapping: %v", err)
}
tenantStore := tenant.NewStore(store)
tenantService := tenant.NewService(tenantStore)
for _, o := range f.Orgs {
withOrgID(tenantStore, o.ID, func() {
if err := tenantService.CreateOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate org: %v", err)
}
})
}
for _, v := range f.NotificationEndpoints {
if err := svc.PutNotificationEndpoint(ctx, v); err != nil {
if err := endpointStore.PutNotificationEndpoint(ctx, v); err != nil {
t.Fatalf("failed to update endpoint: %v", err)
}
}
fakeBackend := NewMockNotificationEndpointBackend(t)
fakeBackend.NotificationEndpointService = svc
fakeBackend.UserService = svc
fakeBackend.UserResourceMappingService = svc
fakeBackend.OrganizationService = svc
fakeBackend.NotificationEndpointService = endpointService
fakeBackend.UserResourceMappingService = tenantService
fakeBackend.UserService = tenantService
handler := NewNotificationEndpointHandler(zaptest.NewLogger(t), fakeBackend)
auth := func(next http.Handler) http.HandlerFunc {
@ -1103,7 +1107,7 @@ func initNotificationEndpointService(f influxTesting.NotificationEndpointFields,
done := server.Close
client := mustNewHTTPClient(t, server.URL, "")
return NewNotificationEndpointService(client), svc, done
return NewNotificationEndpointService(client), kvSvc, done
}
func TestNotificationEndpointService(t *testing.T) {
@ -1111,11 +1115,11 @@ func TestNotificationEndpointService(t *testing.T) {
tests := []struct {
name string
testFn func(init func(influxTesting.NotificationEndpointFields, *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()), t *testing.T)
testFn func(init func(endpointTesting.NotificationEndpointFields, *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()), t *testing.T)
}{
{
name: "CreateNotificationEndpoint",
testFn: influxTesting.CreateNotificationEndpoint,
testFn: endpointTesting.CreateNotificationEndpoint,
},
}
@ -1131,3 +1135,12 @@ func authCtxFn(userID influxdb.ID) func(context.Context) context.Context {
return pcontext.SetAuthorizer(ctx, &influxdb.Session{UserID: userID})
}
}
func withOrgID(store *tenant.Store, orgID influxdb.ID, fn func()) {
backup := store.OrgIDGen
defer func() { store.OrgIDGen = backup }()
store.OrgIDGen = mock.NewStaticIDGenerator(orgID)
fn()
}

View File

@ -1,512 +0,0 @@
package kv
import (
"context"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/notification/check"
)
var _ influxdb.CheckService = (*Service)(nil)
var (
checkBucket = []byte("checksv1")
checkIndexBucket = []byte("checkindexv1")
)
func newCheckStore() *IndexStore {
const resource = "check"
var decEndpointEntFn DecodeBucketValFn = func(key, val []byte) ([]byte, interface{}, error) {
ch, err := check.UnmarshalJSON(val)
return key, ch, err
}
var decValToEntFn ConvertValToEntFn = func(_ []byte, v interface{}) (Entity, error) {
ch, ok := v.(influxdb.Check)
if err := IsErrUnexpectedDecodeVal(ok); err != nil {
return Entity{}, err
}
return Entity{
PK: EncID(ch.GetID()),
UniqueKey: Encode(EncID(ch.GetOrgID()), EncString(ch.GetName())),
Body: ch,
}, nil
}
return &IndexStore{
Resource: resource,
EntStore: NewStoreBase(resource, checkBucket, EncIDKey, EncBodyJSON, decEndpointEntFn, decValToEntFn),
IndexStore: NewOrgNameKeyStore(resource, checkIndexBucket, false),
}
}
// FindCheckByID retrieves a check by id.
func (s *Service) FindCheckByID(ctx context.Context, id influxdb.ID) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
var c influxdb.Check
err := s.kv.View(ctx, func(tx Tx) error {
chkVal, err := s.findCheckByID(ctx, tx, id)
if err != nil {
return err
}
c = chkVal
return nil
})
if err != nil {
return nil, err
}
return c, nil
}
func (s *Service) findCheckByID(ctx context.Context, tx Tx, id influxdb.ID) (influxdb.Check, error) {
chkVal, err := s.checkStore.FindEnt(ctx, tx, Entity{PK: EncID(id)})
if err != nil {
return nil, err
}
return chkVal.(influxdb.Check), nil
}
func (s *Service) findCheckByName(ctx context.Context, tx Tx, orgID influxdb.ID, name string) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
chVal, err := s.checkStore.FindEnt(ctx, tx, Entity{
UniqueKey: Encode(EncID(orgID), EncString(name)),
})
if IsNotFound(err) {
return nil, &influxdb.Error{
Code: influxdb.ENotFound,
Err: err,
}
}
if err != nil {
return nil, err
}
return chVal.(influxdb.Check), nil
}
// FindCheck retrives a check using an arbitrary check filter.
// Filters using ID, or OrganizationID and check Name should be efficient.
// Other filters will do a linear scan across checks until it finds a match.
func (s *Service) FindCheck(ctx context.Context, filter influxdb.CheckFilter) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if filter.ID != nil {
return s.FindCheckByID(ctx, *filter.ID)
}
if filter.Org != nil {
o, err := s.FindOrganizationByName(ctx, *filter.Org)
if err != nil {
return nil, err
}
filter.OrgID = &o.ID
}
var c influxdb.Check
err := s.kv.View(ctx, func(tx Tx) error {
if filter.OrgID != nil && filter.Name != nil {
ch, err := s.findCheckByName(ctx, tx, *filter.OrgID, *filter.Name)
c = ch
return err
}
var prefix []byte
if filter.OrgID != nil {
ent := Entity{UniqueKey: EncID(*filter.OrgID)}
prefix, _ = s.checkStore.IndexStore.EntKey(ctx, ent)
}
filterFn := filterChecksFn(nil, filter)
return s.checkStore.Find(ctx, tx, FindOpts{
Prefix: prefix,
Limit: 1,
FilterEntFn: func(k []byte, v interface{}) bool {
ch, ok := v.(influxdb.Check)
if err := IsErrUnexpectedDecodeVal(ok); err != nil {
return false
}
return filterFn(ch)
},
CaptureFn: func(key []byte, decodedVal interface{}) error {
c, _ = decodedVal.(influxdb.Check)
return nil
},
})
})
if err != nil {
return nil, err
}
if c == nil {
return nil, &influxdb.Error{
Code: influxdb.ENotFound,
Msg: "check not found",
}
}
return c, nil
}
func filterChecksFn(idMap map[influxdb.ID]bool, filter influxdb.CheckFilter) func(c influxdb.Check) bool {
return func(c influxdb.Check) bool {
if filter.ID != nil && c.GetID() != *filter.ID {
return false
}
if filter.OrgID != nil && c.GetOrgID() != *filter.OrgID {
return false
}
if filter.Name != nil && c.GetName() != *filter.Name {
return false
}
if idMap == nil {
return true
}
return idMap[c.GetID()]
}
}
// FindChecks retrieves all checks that match an arbitrary check filter.
// Filters using ID, or OrganizationID and check Name should be efficient.
// Other filters will do a linear scan across all checks searching for a match.
func (s *Service) FindChecks(ctx context.Context, filter influxdb.CheckFilter, opts ...influxdb.FindOptions) ([]influxdb.Check, int, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if filter.ID != nil {
c, err := s.FindCheckByID(ctx, *filter.ID)
if err != nil {
return nil, 0, err
}
return []influxdb.Check{c}, 1, nil
}
m, _, err := s.FindUserResourceMappings(ctx, filter.UserResourceMappingFilter)
if err != nil || len(m) == 0 {
return nil, 0, err
}
idMap := make(map[influxdb.ID]bool)
for _, item := range m {
idMap[item.ResourceID] = true
}
var checks []influxdb.Check
err = s.kv.View(ctx, func(tx Tx) error {
if filter.Org != nil {
o, err := s.findOrganizationByName(ctx, tx, *filter.Org)
if err != nil {
return &influxdb.Error{Err: err}
}
filter.OrgID = &o.ID
}
var opt influxdb.FindOptions
if len(opts) > 0 {
opt = opts[0]
}
filterFn := filterChecksFn(idMap, filter)
return s.checkStore.Find(ctx, tx, FindOpts{
Descending: opt.Descending,
Offset: opt.Offset,
Limit: opt.Limit,
FilterEntFn: func(k []byte, v interface{}) bool {
ch, ok := v.(influxdb.Check)
if err := IsErrUnexpectedDecodeVal(ok); err != nil {
return false
}
return filterFn(ch)
},
CaptureFn: func(key []byte, decodedVal interface{}) error {
c, ok := decodedVal.(influxdb.Check)
if err := IsErrUnexpectedDecodeVal(ok); err != nil {
return err
}
checks = append(checks, c)
return nil
},
})
})
if err != nil {
return nil, 0, err
}
return checks, len(checks), nil
}
// CreateCheck creates a influxdb check and sets ID.
func (s *Service) CreateCheck(ctx context.Context, c influxdb.CheckCreate, userID influxdb.ID) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if err := c.Status.Valid(); err != nil {
return err
}
if c.GetOrgID().Valid() {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if _, err := s.FindOrganizationByID(ctx, c.GetOrgID()); err != nil {
return &influxdb.Error{
Code: influxdb.ENotFound,
Op: influxdb.OpCreateCheck,
Err: err,
}
}
}
return s.kv.Update(ctx, func(tx Tx) error {
return s.createCheck(ctx, tx, c, userID)
})
}
func (s *Service) createCheck(ctx context.Context, tx Tx, c influxdb.CheckCreate, userID influxdb.ID) error {
c.SetID(s.IDGenerator.ID())
c.SetOwnerID(userID)
now := s.Now()
c.SetCreatedAt(now)
c.SetUpdatedAt(now)
if err := c.Valid(s.FluxLanguageService); err != nil {
return err
}
t, err := s.createCheckTask(ctx, tx, c)
if err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "Could not create task from check",
Err: err,
}
}
c.SetTaskID(t.ID)
if err := s.putCheck(ctx, tx, c, PutNew()); err != nil {
return err
}
return s.createUserResourceMappingForOrg(ctx, tx, c.GetOrgID(), c.GetID(), influxdb.ChecksResourceType)
}
func (s *Service) createCheckTask(ctx context.Context, tx Tx, c influxdb.CheckCreate) (*influxdb.Task, error) {
script, err := c.GenerateFlux(s.FluxLanguageService)
if err != nil {
return nil, err
}
tc := influxdb.TaskCreate{
Type: c.Type(),
Flux: script,
OwnerID: c.GetOwnerID(),
OrganizationID: c.GetOrgID(),
Status: string(c.Status),
}
t, err := s.createTask(ctx, tx, tc)
if err != nil {
return nil, err
}
return t, nil
}
// PutCheck will put a check without setting an ID.
func (s *Service) PutCheck(ctx context.Context, c influxdb.Check) error {
if err := c.Valid(s.FluxLanguageService); err != nil {
return err
}
return s.kv.Update(ctx, func(tx Tx) error {
return s.putCheck(ctx, tx, c)
})
}
func (s *Service) putCheck(ctx context.Context, tx Tx, c influxdb.Check, opts ...PutOptionFn) error {
return s.checkStore.Put(ctx, tx, Entity{
PK: EncID(c.GetID()),
UniqueKey: Encode(EncID(c.GetOrgID()), EncString(c.GetName())),
Body: c,
}, opts...)
}
// PatchCheck updates a check according the parameters set on upd.
func (s *Service) PatchCheck(ctx context.Context, id influxdb.ID, upd influxdb.CheckUpdate) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
var c influxdb.Check
err := s.kv.Update(ctx, func(tx Tx) error {
chk, err := s.patchCheck(ctx, tx, id, upd)
if err != nil {
return err
}
c = chk
return nil
})
return c, err
}
// UpdateCheck updates the check.
func (s *Service) UpdateCheck(ctx context.Context, id influxdb.ID, chk influxdb.CheckCreate) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
var c influxdb.Check
err := s.kv.Update(ctx, func(tx Tx) error {
chk, err := s.updateCheck(ctx, tx, id, chk)
if err != nil {
return err
}
c = chk
return nil
})
return c, err
}
func (s *Service) updateCheck(ctx context.Context, tx Tx, id influxdb.ID, chk influxdb.CheckCreate) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
current, err := s.findCheckByID(ctx, tx, id)
if err != nil {
return nil, err
}
if chk.GetName() != current.GetName() {
c0, err := s.findCheckByName(ctx, tx, current.GetOrgID(), chk.GetName())
if err == nil && c0.GetID() != id {
return nil, &influxdb.Error{
Code: influxdb.EConflict,
Msg: "check name is not unique",
}
}
ent := Entity{
UniqueKey: Encode(EncID(current.GetOrgID()), EncString(current.GetName())),
}
if err := s.checkStore.IndexStore.DeleteEnt(ctx, tx, ent); err != nil {
return nil, err
}
}
chk.SetTaskID(current.GetTaskID())
flux, err := chk.GenerateFlux(s.FluxLanguageService)
if err != nil {
return nil, err
}
tu := influxdb.TaskUpdate{
Flux: &flux,
Description: strPtr(chk.GetDescription()),
}
if chk.Status != "" {
tu.Status = strPtr(string(chk.Status))
}
if _, err := s.updateTask(ctx, tx, chk.GetTaskID(), tu); err != nil {
return nil, err
}
// ID and OrganizationID can not be updated
chk.SetID(current.GetID())
chk.SetOrgID(current.GetOrgID())
chk.SetOwnerID(current.GetOwnerID())
chk.SetCreatedAt(current.GetCRUDLog().CreatedAt)
chk.SetUpdatedAt(s.Now())
if err := chk.Valid(s.FluxLanguageService); err != nil {
return nil, err
}
if err := chk.Status.Valid(); err != nil {
return nil, err
}
if err := s.putCheck(ctx, tx, chk.Check); err != nil {
return nil, err
}
return chk.Check, nil
}
func (s *Service) patchCheck(ctx context.Context, tx Tx, id influxdb.ID, upd influxdb.CheckUpdate) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
c, err := s.findCheckByID(ctx, tx, id)
if err != nil {
return nil, err
}
if upd.Name != nil {
c.SetName(*upd.Name)
}
if upd.Description != nil {
c.SetDescription(*upd.Description)
}
c.SetUpdatedAt(s.Now())
tu := influxdb.TaskUpdate{
Description: strPtr(c.GetDescription()),
}
if upd.Status != nil {
tu.Status = strPtr(string(*upd.Status))
}
if err := c.Valid(s.FluxLanguageService); err != nil {
return nil, err
}
if err := s.putCheck(ctx, tx, c, PutUpdate()); err != nil {
return nil, err
}
if _, err := s.updateTask(ctx, tx, c.GetTaskID(), tu); err != nil {
return nil, err
}
return c, nil
}
// DeleteCheck deletes a check and prunes it from the index.
func (s *Service) DeleteCheck(ctx context.Context, id influxdb.ID) error {
ch, err := s.FindCheckByID(ctx, id)
if err != nil {
return err
}
return s.kv.Update(ctx, func(tx Tx) error {
err := s.checkStore.DeleteEnt(ctx, tx, Entity{
PK: EncID(id),
})
if err != nil {
return err
}
if err := s.deleteTask(ctx, tx, ch.GetTaskID()); err != nil {
return err
}
return s.deleteUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{
ResourceID: id,
ResourceType: influxdb.ChecksResourceType,
})
})
}
func strPtr(s string) *string {
ss := new(string)
*ss = s
return ss
}

View File

@ -1,79 +0,0 @@
package kv_test
import (
"context"
"testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/query/fluxlang"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
func TestBoltCheckService(t *testing.T) {
influxdbtesting.CheckService(initBoltCheckService, t)
}
func initBoltCheckService(f influxdbtesting.CheckFields, t *testing.T) (influxdb.CheckService, *kv.Service, string, func()) {
s, closeBolt, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, op, closeSvc := initCheckService(s, f, t)
return svc, svc, op, func() {
closeSvc()
closeBolt()
}
}
func initCheckService(s kv.SchemaStore, f influxdbtesting.CheckFields, t *testing.T) (*kv.Service, string, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s, kv.ServiceConfig{
FluxLanguageService: fluxlang.DefaultService,
})
svc.IDGenerator = f.IDGenerator
svc.TimeGenerator = f.TimeGenerator
if f.TimeGenerator == nil {
svc.TimeGenerator = influxdb.RealTimeGenerator{}
}
for _, m := range f.UserResourceMappings {
if err := svc.CreateUserResourceMapping(ctx, m); err != nil {
t.Fatalf("failed to populate user resource mapping: %v", err)
}
}
for _, o := range f.Organizations {
if err := svc.PutOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate organizations")
}
}
for _, c := range f.Checks {
if err := svc.PutCheck(ctx, c); err != nil {
t.Fatalf("failed to populate checks")
}
}
for _, tc := range f.Tasks {
if _, err := svc.CreateTask(ctx, tc); err != nil {
t.Fatalf("failed to populate tasks: %v", err)
}
}
return svc, kv.OpPrefix, func() {
for _, o := range f.Organizations {
if err := svc.DeleteOrganization(ctx, o.ID); err != nil {
t.Logf("failed to remove organization: %v", err)
}
}
for _, urm := range f.UserResourceMappings {
if err := svc.DeleteUserResourceMapping(ctx, urm.ResourceID, urm.UserID); err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
t.Logf("failed to remove urm rule: %v", err)
}
}
for _, c := range f.Checks {
if err := svc.DeleteCheck(ctx, c.GetID()); err != nil {
t.Logf("failed to remove check: %v", err)
}
}
}
}

View File

@ -54,10 +54,10 @@ func (m InitialMigration) Up(ctx context.Context, store SchemaStore) error {
[]byte("templates/documents/content"),
[]byte("templates/documents/meta"),
// store base backed services
checkBucket,
checkIndexBucket,
notificationEndpointBucket,
notificationEndpointIndexBucket,
[]byte("checksv1"),
[]byte("checkindexv1"),
[]byte("notificationEndpointv1"),
[]byte("notificationEndpointIndexv1"),
variableBucket,
variableIndexBucket,
variableOrgsIndex,

View File

@ -1,334 +0,0 @@
package kv
import (
"context"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/notification/endpoint"
)
var (
// ErrNotificationEndpointNotFound is used when the notification endpoint is not found.
ErrNotificationEndpointNotFound = &influxdb.Error{
Msg: "notification endpoint not found",
Code: influxdb.ENotFound,
}
notificationEndpointBucket = []byte("notificationEndpointv1")
notificationEndpointIndexBucket = []byte("notificationEndpointIndexv1")
)
var _ influxdb.NotificationEndpointService = (*Service)(nil)
func newEndpointStore() *IndexStore {
const resource = "notification endpoint"
var decEndpointEntFn DecodeBucketValFn = func(key, val []byte) ([]byte, interface{}, error) {
edp, err := endpoint.UnmarshalJSON(val)
return key, edp, err
}
var decValToEntFn ConvertValToEntFn = func(_ []byte, v interface{}) (Entity, error) {
edp, ok := v.(influxdb.NotificationEndpoint)
if err := IsErrUnexpectedDecodeVal(ok); err != nil {
return Entity{}, err
}
return Entity{
PK: EncID(edp.GetID()),
UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())),
Body: edp,
}, nil
}
return &IndexStore{
Resource: resource,
EntStore: NewStoreBase(resource, notificationEndpointBucket, EncIDKey, EncBodyJSON, decEndpointEntFn, decValToEntFn),
IndexStore: NewOrgNameKeyStore(resource, notificationEndpointIndexBucket, true),
}
}
// CreateNotificationEndpoint creates a new notification endpoint and sets b.ID with the new identifier.
func (s *Service) CreateNotificationEndpoint(ctx context.Context, edp influxdb.NotificationEndpoint, userID influxdb.ID) error {
return s.kv.Update(ctx, func(tx Tx) error {
return s.createNotificationEndpoint(ctx, tx, edp, userID)
})
}
func (s *Service) createNotificationEndpoint(ctx context.Context, tx Tx, edp influxdb.NotificationEndpoint, userID influxdb.ID) error {
// TODO(jsteenb2): why is org id check not necesssary if orgID isn't valid... feels odd
if edp.GetOrgID().Valid() {
span, ctx := tracing.StartSpanFromContext(ctx)
// TODO(jsteenb2): this defer doesn't get called until the end of entire function,
// need to rip this out as is
defer span.Finish()
if _, err := s.findOrganizationByID(ctx, tx, edp.GetOrgID()); err != nil {
return err
}
}
id := s.IDGenerator.ID()
edp.SetID(id)
now := s.TimeGenerator.Now()
edp.SetCreatedAt(now)
edp.SetUpdatedAt(now)
edp.BackfillSecretKeys()
if err := edp.Valid(); err != nil {
return err
}
ent := Entity{
PK: EncID(edp.GetID()),
UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())),
Body: edp,
}
if err := s.endpointStore.Put(ctx, tx, ent, PutNew()); err != nil {
return err
}
urm := &influxdb.UserResourceMapping{
ResourceID: edp.GetID(),
UserID: userID,
UserType: influxdb.Owner,
ResourceType: influxdb.NotificationEndpointResourceType,
}
return s.createUserResourceMapping(ctx, tx, urm)
}
// UpdateNotificationEndpoint updates a single notification endpoint.
// Returns the new notification endpoint after update.
func (s *Service) UpdateNotificationEndpoint(ctx context.Context, id influxdb.ID, edp influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
var err error
err = s.kv.Update(ctx, func(tx Tx) error {
edp, err = s.updateNotificationEndpoint(ctx, tx, id, edp, userID)
return err
})
return edp, err
}
func (s *Service) updateNotificationEndpoint(ctx context.Context, tx Tx, id influxdb.ID, edp influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
current, err := s.findNotificationEndpointByID(ctx, tx, id)
if err != nil {
return nil, err
}
// ID and OrganizationID can not be updated
edp.SetCreatedAt(current.GetCRUDLog().CreatedAt)
edp.SetUpdatedAt(s.TimeGenerator.Now())
if err := edp.Valid(); err != nil {
return nil, err
}
ent := Entity{
PK: EncID(edp.GetID()),
UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())),
Body: edp,
}
if err := s.endpointStore.Put(ctx, tx, ent, PutUpdate()); err != nil {
return nil, err
}
return edp, nil
}
// PatchNotificationEndpoint updates a single notification endpoint with changeset.
// Returns the new notification endpoint state after update.
func (s *Service) PatchNotificationEndpoint(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error) {
var edp influxdb.NotificationEndpoint
if err := s.kv.Update(ctx, func(tx Tx) (err error) {
edp, err = s.patchNotificationEndpoint(ctx, tx, id, upd)
if err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
return edp, nil
}
func (s *Service) patchNotificationEndpoint(ctx context.Context, tx Tx, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error) {
edp, err := s.findNotificationEndpointByID(ctx, tx, id)
if err != nil {
return nil, err
}
if upd.Name != nil {
edp.SetName(*upd.Name)
}
if upd.Description != nil {
edp.SetDescription(*upd.Description)
}
if upd.Status != nil {
edp.SetStatus(*upd.Status)
}
edp.SetUpdatedAt(s.TimeGenerator.Now())
if err := edp.Valid(); err != nil {
return nil, err
}
// TODO(jsteenb2): every above here moves into service layer
ent := Entity{
PK: EncID(edp.GetID()),
UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())),
Body: edp,
}
if err := s.endpointStore.Put(ctx, tx, ent, PutUpdate()); err != nil {
return nil, err
}
return edp, nil
}
// PutNotificationEndpoint put a notification endpoint to storage.
func (s *Service) PutNotificationEndpoint(ctx context.Context, edp influxdb.NotificationEndpoint) error {
// TODO(jsteenb2): all the stuffs before the update should be moved up into the
// service layer as well as all the id/time setting items
if err := edp.Valid(); err != nil {
return err
}
return s.kv.Update(ctx, func(tx Tx) (err error) {
ent := Entity{
PK: EncID(edp.GetID()),
UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())),
Body: edp,
}
return s.endpointStore.Put(ctx, tx, ent)
})
}
// FindNotificationEndpointByID returns a single notification endpoint by ID.
func (s *Service) FindNotificationEndpointByID(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
var (
edp influxdb.NotificationEndpoint
err error
)
err = s.kv.View(ctx, func(tx Tx) error {
edp, err = s.findNotificationEndpointByID(ctx, tx, id)
return err
})
return edp, err
}
func (s *Service) findNotificationEndpointByID(ctx context.Context, tx Tx, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
decodedEnt, err := s.endpointStore.FindEnt(ctx, tx, Entity{PK: EncID(id)})
if err != nil {
return nil, err
}
edp, ok := decodedEnt.(influxdb.NotificationEndpoint)
return edp, IsErrUnexpectedDecodeVal(ok)
}
// FindNotificationEndpoints returns a list of notification endpoints that match isNext and the total count of matching notification endpoints.
// Additional options provide pagination & sorting.
func (s *Service) FindNotificationEndpoints(ctx context.Context, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) (edps []influxdb.NotificationEndpoint, n int, err error) {
err = s.kv.View(ctx, func(tx Tx) error {
edps, n, err = s.findNotificationEndpoints(ctx, tx, filter, opt...)
return err
})
return edps, n, err
}
func (s *Service) findNotificationEndpoints(ctx context.Context, tx Tx, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error) {
m, err := s.findUserResourceMappings(ctx, tx, filter.UserResourceMappingFilter)
if err != nil {
return nil, 0, err
}
if len(m) == 0 {
return []influxdb.NotificationEndpoint{}, 0, nil
}
idMap := make(map[influxdb.ID]bool)
for _, item := range m {
idMap[item.ResourceID] = true
}
if filter.Org != nil {
o, err := s.findOrganizationByName(ctx, tx, *filter.Org)
if err != nil {
return nil, 0, &influxdb.Error{
Err: err,
}
}
filter.OrgID = &o.ID
}
var o influxdb.FindOptions
if len(opt) > 0 {
o = opt[0]
}
edps := make([]influxdb.NotificationEndpoint, 0)
err = s.endpointStore.Find(ctx, tx, FindOpts{
Descending: o.Descending,
Offset: o.Offset,
Limit: o.Limit,
FilterEntFn: filterEndpointsFn(idMap, filter),
CaptureFn: func(k []byte, v interface{}) error {
edp, ok := v.(influxdb.NotificationEndpoint)
if err := IsErrUnexpectedDecodeVal(ok); err != nil {
return err
}
edps = append(edps, edp)
return nil
},
})
if err != nil {
return nil, 0, err
}
return edps, len(edps), err
}
func filterEndpointsFn(idMap map[influxdb.ID]bool, filter influxdb.NotificationEndpointFilter) func([]byte, interface{}) bool {
return func(key []byte, val interface{}) bool {
edp := val.(influxdb.NotificationEndpoint)
if filter.ID != nil && edp.GetID() != *filter.ID {
return false
}
if filter.OrgID != nil && edp.GetOrgID() != *filter.OrgID {
return false
}
if idMap == nil {
return true
}
return idMap[edp.GetID()]
}
}
// DeleteNotificationEndpoint removes a notification endpoint by ID.
func (s *Service) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) (flds []influxdb.SecretField, orgID influxdb.ID, err error) {
err = s.kv.Update(ctx, func(tx Tx) error {
flds, orgID, err = s.deleteNotificationEndpoint(ctx, tx, id)
return err
})
return flds, orgID, err
}
func (s *Service) deleteNotificationEndpoint(ctx context.Context, tx Tx, id influxdb.ID) (flds []influxdb.SecretField, orgID influxdb.ID, err error) {
edp, err := s.findNotificationEndpointByID(ctx, tx, id)
if err != nil {
return nil, 0, err
}
if err := s.endpointStore.DeleteEnt(ctx, tx, Entity{PK: EncID(id)}); err != nil {
return nil, 0, err
}
return edp.SecretFields(), edp.GetOrgID(), s.deleteUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{
ResourceID: id,
ResourceType: influxdb.NotificationEndpointResourceType,
})
}

View File

@ -1,77 +0,0 @@
package kv_test
import (
"context"
"testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/endpoints"
"github.com/influxdata/influxdb/v2/kv"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
func TestNotificationEndpointService(t *testing.T) {
influxdbtesting.NotificationEndpointService(initBoltNotificationEndpointService, t)
}
func initBoltNotificationEndpointService(f influxdbtesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()) {
store, closeBolt, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, secretSVC, closeSvc := initNotificationEndpointService(store, f, t)
return svc, secretSVC, func() {
closeSvc()
closeBolt()
}
}
func initNotificationEndpointService(s kv.SchemaStore, f influxdbtesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s)
svc.IDGenerator = f.IDGenerator
svc.TimeGenerator = f.TimeGenerator
if f.TimeGenerator == nil {
svc.TimeGenerator = influxdb.RealTimeGenerator{}
}
for _, edp := range f.NotificationEndpoints {
if err := svc.PutNotificationEndpoint(ctx, edp); err != nil {
t.Fatalf("failed to populate notification endpoint: %v", err)
}
}
for _, o := range f.Orgs {
if err := svc.PutOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate org: %v", err)
}
}
for _, m := range f.UserResourceMappings {
if err := svc.CreateUserResourceMapping(ctx, m); err != nil {
t.Fatalf("failed to populate user resource mapping: %v", err)
}
}
endpointSVC := endpoints.NewService(svc, svc, svc, svc)
return endpointSVC, svc, func() {
for _, edp := range f.NotificationEndpoints {
if _, _, err := svc.DeleteNotificationEndpoint(ctx, edp.GetID()); err != nil && err != kv.ErrNotificationEndpointNotFound {
t.Logf("failed to remove notification endpoint: %v", err)
}
}
for _, o := range f.Orgs {
if err := svc.DeleteOrganization(ctx, o.ID); err != nil {
t.Fatalf("failed to remove org: %v", err)
}
}
for _, urm := range f.UserResourceMappings {
if err := svc.DeleteUserResourceMapping(ctx, urm.ResourceID, urm.UserID); err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
t.Logf("failed to remove urm rule: %v", err)
}
}
}
}

View File

@ -44,8 +44,6 @@ type Service struct {
influxdb.TimeGenerator
Hash Crypt
checkStore *IndexStore
endpointStore *IndexStore
variableStore *IndexStore
urmByUserIndex *Index
@ -64,8 +62,6 @@ func NewService(log *zap.Logger, kv Store, configs ...ServiceConfig) *Service {
kv: kv,
audit: noop.ResourceLogger{},
TimeGenerator: influxdb.RealTimeGenerator{},
checkStore: newCheckStore(),
endpointStore: newEndpointStore(),
variableStore: newVariableStore(),
urmByUserIndex: NewIndex(URMByUserIndexMapping, WithIndexReadPathEnabled),
}

View File

@ -16,18 +16,21 @@ import (
influxTesting "github.com/influxdata/influxdb/v2/testing"
)
const (
id1 = "020f755c3c082000"
id3 = "020f755c3c082002"
)
var (
id1 = influxTesting.MustIDBase16Ptr("020f755c3c082000")
id3 = influxTesting.MustIDBase16Ptr("020f755c3c082002")
var goodBase = endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
Status: influxdb.Active,
Description: "desc1",
}
timeGen1 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 13, 4, 19, 10, 0, time.UTC)}
timeGen2 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 14, 5, 23, 53, 10, time.UTC)}
goodBase = endpoint.Base{
ID: id1,
Name: "name1",
OrgID: id3,
Status: influxdb.Active,
Description: "desc1",
}
)
func TestValidEndpoint(t *testing.T) {
cases := []struct {
@ -48,9 +51,9 @@ func TestValidEndpoint(t *testing.T) {
name: "invalid status",
src: &endpoint.PagerDuty{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
},
},
err: &influxdb.Error{
@ -62,12 +65,12 @@ func TestValidEndpoint(t *testing.T) {
name: "empty name PagerDuty",
src: &endpoint.PagerDuty{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
OrgID: influxTesting.MustIDBase16Ptr(id3),
ID: id1,
OrgID: id3,
Status: influxdb.Active,
},
ClientURL: "https://events.pagerduty.com/v2/enqueue",
RoutingKey: influxdb.SecretField{Key: id1 + "-routing-key"},
RoutingKey: influxdb.SecretField{Key: id1.String() + "-routing-key"},
},
err: &influxdb.Error{
Code: influxdb.EInvalid,
@ -78,11 +81,11 @@ func TestValidEndpoint(t *testing.T) {
name: "empty name Telegram",
src: &endpoint.Telegram{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
OrgID: influxTesting.MustIDBase16Ptr(id3),
ID: id1,
OrgID: id3,
Status: influxdb.Active,
},
Token: influxdb.SecretField{Key: id1 + "-token"},
Token: influxdb.SecretField{Key: id1.String() + "-token"},
Channel: "-1001406363649",
},
err: &influxdb.Error{
@ -177,7 +180,7 @@ func TestValidEndpoint(t *testing.T) {
name: "empty telegram channel",
src: &endpoint.Telegram{
Base: goodBase,
Token: influxdb.SecretField{Key: id1 + "-token"},
Token: influxdb.SecretField{Key: id1.String() + "-token"},
},
err: &influxdb.Error{
Code: influxdb.EInvalid,
@ -188,7 +191,7 @@ func TestValidEndpoint(t *testing.T) {
name: "valid telegram token",
src: &endpoint.Telegram{
Base: goodBase,
Token: influxdb.SecretField{Key: id1 + "-token"},
Token: influxdb.SecretField{Key: id1.String() + "-token"},
Channel: "-1001406363649",
},
err: nil,
@ -208,9 +211,6 @@ func TestValidEndpoint(t *testing.T) {
}
}
var timeGen1 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 13, 4, 19, 10, 0, time.UTC)}
var timeGen2 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 14, 5, 23, 53, 10, time.UTC)}
func TestJSON(t *testing.T) {
cases := []struct {
name string
@ -220,9 +220,9 @@ func TestJSON(t *testing.T) {
name: "simple Slack",
src: &endpoint.Slack{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
@ -237,9 +237,9 @@ func TestJSON(t *testing.T) {
name: "Slack without token",
src: &endpoint.Slack{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
@ -253,9 +253,9 @@ func TestJSON(t *testing.T) {
name: "simple pagerduty",
src: &endpoint.PagerDuty{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
@ -270,9 +270,9 @@ func TestJSON(t *testing.T) {
name: "simple http",
src: &endpoint.HTTP{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
@ -293,9 +293,9 @@ func TestJSON(t *testing.T) {
name: "simple Telegram",
src: &endpoint.Telegram{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "nameTelegram",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
@ -331,9 +331,9 @@ func TestBackFill(t *testing.T) {
name: "simple Slack",
src: &endpoint.Slack{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
@ -347,9 +347,9 @@ func TestBackFill(t *testing.T) {
},
target: &endpoint.Slack{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
@ -358,7 +358,7 @@ func TestBackFill(t *testing.T) {
},
URL: "https://slack.com/api/chat.postMessage",
Token: influxdb.SecretField{
Key: id1 + "-token",
Key: id1.String() + "-token",
Value: strPtr("token-value"),
},
},
@ -367,9 +367,9 @@ func TestBackFill(t *testing.T) {
name: "simple pagerduty",
src: &endpoint.PagerDuty{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
@ -383,9 +383,9 @@ func TestBackFill(t *testing.T) {
},
target: &endpoint.PagerDuty{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
@ -394,7 +394,7 @@ func TestBackFill(t *testing.T) {
},
ClientURL: "https://events.pagerduty.com/v2/enqueue",
RoutingKey: influxdb.SecretField{
Key: id1 + "-routing-key",
Key: id1.String() + "-routing-key",
Value: strPtr("routing-key-value"),
},
},
@ -403,9 +403,9 @@ func TestBackFill(t *testing.T) {
name: "http with token",
src: &endpoint.HTTP{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
@ -423,9 +423,9 @@ func TestBackFill(t *testing.T) {
},
target: &endpoint.HTTP{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
@ -435,11 +435,11 @@ func TestBackFill(t *testing.T) {
AuthMethod: "basic",
URL: "http://example.com",
Username: influxdb.SecretField{
Key: id1 + "-username",
Key: id1.String() + "-username",
Value: strPtr("username1"),
},
Password: influxdb.SecretField{
Key: id1 + "-password",
Key: id1.String() + "-password",
Value: strPtr("password1"),
},
},
@ -448,9 +448,9 @@ func TestBackFill(t *testing.T) {
name: "simple Telegram",
src: &endpoint.Telegram{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
@ -463,9 +463,9 @@ func TestBackFill(t *testing.T) {
},
target: &endpoint.Telegram{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
@ -473,7 +473,7 @@ func TestBackFill(t *testing.T) {
},
},
Token: influxdb.SecretField{
Key: id1 + "-token",
Key: id1.String() + "-token",
Value: strPtr("token-value"),
},
},
@ -497,9 +497,9 @@ func TestSecretFields(t *testing.T) {
name: "simple Slack",
src: &endpoint.Slack{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
@ -508,13 +508,13 @@ func TestSecretFields(t *testing.T) {
},
URL: "https://slack.com/api/chat.postMessage",
Token: influxdb.SecretField{
Key: id1 + "-token",
Key: id1.String() + "-token",
Value: strPtr("token-value"),
},
},
secrets: []influxdb.SecretField{
{
Key: id1 + "-token",
Key: id1.String() + "-token",
Value: strPtr("token-value"),
},
},
@ -523,9 +523,9 @@ func TestSecretFields(t *testing.T) {
name: "simple pagerduty",
src: &endpoint.PagerDuty{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
@ -534,13 +534,13 @@ func TestSecretFields(t *testing.T) {
},
ClientURL: "https://events.pagerduty.com/v2/enqueue",
RoutingKey: influxdb.SecretField{
Key: id1 + "-routing-key",
Key: id1.String() + "-routing-key",
Value: strPtr("routing-key-value"),
},
},
secrets: []influxdb.SecretField{
{
Key: id1 + "-routing-key",
Key: id1.String() + "-routing-key",
Value: strPtr("routing-key-value"),
},
},
@ -549,9 +549,9 @@ func TestSecretFields(t *testing.T) {
name: "http with user and password",
src: &endpoint.HTTP{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
@ -561,21 +561,21 @@ func TestSecretFields(t *testing.T) {
AuthMethod: "basic",
URL: "http://example.com",
Username: influxdb.SecretField{
Key: id1 + "-username",
Key: id1.String() + "-username",
Value: strPtr("user1"),
},
Password: influxdb.SecretField{
Key: id1 + "-password",
Key: id1.String() + "-password",
Value: strPtr("password1"),
},
},
secrets: []influxdb.SecretField{
{
Key: id1 + "-username",
Key: id1.String() + "-username",
Value: strPtr("user1"),
},
{
Key: id1 + "-password",
Key: id1.String() + "-password",
Value: strPtr("password1"),
},
},
@ -584,9 +584,9 @@ func TestSecretFields(t *testing.T) {
name: "simple Telegram",
src: &endpoint.Telegram{
Base: endpoint.Base{
ID: influxTesting.MustIDBase16Ptr(id1),
ID: id1,
Name: "name1",
OrgID: influxTesting.MustIDBase16Ptr(id3),
OrgID: id3,
Status: influxdb.Active,
CRUDLog: influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
@ -594,13 +594,13 @@ func TestSecretFields(t *testing.T) {
},
},
Token: influxdb.SecretField{
Key: id1 + "-token",
Key: id1.String() + "-token",
Value: strPtr("token-value"),
},
},
secrets: []influxdb.SecretField{
{
Key: id1 + "-token",
Key: id1.String() + "-token",
Value: strPtr("token-value"),
},
},

View File

@ -1,28 +1,22 @@
package endpoints
package service
import (
"context"
"github.com/influxdata/influxdb/v2"
influxdb "github.com/influxdata/influxdb/v2"
)
// Service provides all the notification endpoint service behavior.
type Service struct {
endpointStore influxdb.NotificationEndpointService
secretSVC influxdb.SecretService
// TODO(jsteenb2): NUKE THESE 2 embedded services after fixing up the domain!
influxdb.UserResourceMappingService
influxdb.OrganizationService
}
// NewService constructs a new Service.
func NewService(store influxdb.NotificationEndpointService, secretSVC influxdb.SecretService, urmSVC influxdb.UserResourceMappingService, orgSVC influxdb.OrganizationService) *Service {
// New constructs a new Service.
func New(store influxdb.NotificationEndpointService, secretSVC influxdb.SecretService) *Service {
return &Service{
endpointStore: store,
secretSVC: secretSVC,
UserResourceMappingService: urmSVC,
OrganizationService: orgSVC,
endpointStore: store,
secretSVC: secretSVC,
}
}

View File

@ -1,45 +1,43 @@
package endpoints_test
package service_test
import (
"context"
"testing"
"time"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/endpoints"
influxdb "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/notification/endpoint"
"github.com/influxdata/influxdb/v2/notification/endpoint/service"
"github.com/influxdata/influxdb/v2/pkg/pointer"
"github.com/influxdata/influxdb/v2/tenant"
influxTesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
var id1 = influxTesting.MustIDBase16Ptr("020f755c3c082000")
var id2 = influxTesting.MustIDBase16Ptr("020f755c3c082001")
var orgID = influxTesting.MustIDBase16Ptr("a10f755c3c082001")
var userID = influxTesting.MustIDBase16Ptr("b10f755c3c082001")
var (
id1 = influxTesting.MustIDBase16Ptr("020f755c3c082000")
id2 = influxTesting.MustIDBase16Ptr("020f755c3c082001")
orgID = influxTesting.MustIDBase16Ptr("a10f755c3c082001")
userID = influxTesting.MustIDBase16Ptr("b10f755c3c082001")
var timeGen1 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 13, 4, 19, 10, 0, time.UTC)}
var timeGen2 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 14, 5, 23, 53, 10, time.UTC)}
var testCrudLog = influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
UpdatedAt: timeGen2.Now(),
}
timeGen1 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 13, 4, 19, 10, 0, time.UTC)}
timeGen2 = mock.TimeGenerator{FakeValue: time.Date(2006, time.July, 14, 5, 23, 53, 10, time.UTC)}
// newInmemService creates a new in-memory secret service
func newInmemService(t *testing.T) *kv.Service {
testCrudLog = influxdb.CRUDLog{
CreatedAt: timeGen1.Now(),
UpdatedAt: timeGen2.Now(),
}
)
func newSecretService(t *testing.T, ctx context.Context, logger *zap.Logger, s kv.Store) influxdb.SecretService {
t.Helper()
store := inmem.NewKVStore()
logger := zaptest.NewLogger(t)
ctx := context.Background()
// initialize the store
if err := all.Up(ctx, logger, store); err != nil {
t.Fatal(err)
}
svc := kv.NewService(logger, store)
tenantSvc := tenant.NewService(tenant.NewStore(s))
// initialize organization
org := influxdb.Organization{
@ -48,21 +46,26 @@ func newInmemService(t *testing.T) *kv.Service {
CRUDLog: testCrudLog,
}
if err := svc.CreateOrganization(ctx, &org); err != nil {
if err := tenantSvc.CreateOrganization(ctx, &org); err != nil {
t.Fatal(err)
}
orgID = &org.ID // orgID is generated
return svc
return kv.NewService(logger, s)
}
// TestEndpointService_cummulativeSecrets tests that secrets are cummulatively added/updated and removed upon delete
// see https://github.com/influxdata/influxdb/pull/19082 for details
func TestEndpointService_cummulativeSecrets(t *testing.T) {
inMemService := newInmemService(t)
endpointService := endpoints.NewService(inMemService, inMemService, inMemService, inMemService)
secretService := inMemService
ctx := context.Background()
store := inmem.NewKVStore()
logger := zaptest.NewLogger(t)
if err := all.Up(ctx, logger, store); err != nil {
t.Fatal(err)
}
secretService := newSecretService(t, ctx, logger, store)
endpointService := service.New(service.NewStore(store), secretService)
var endpoint1 = endpoint.HTTP{
Base: endpoint.Base{
@ -79,8 +82,8 @@ func TestEndpointService_cummulativeSecrets(t *testing.T) {
AuthMethod: "basic",
Method: "POST",
URL: "http://example.com",
Username: influxdb.SecretField{Key: id1.String() + "username-key", Value: strPtr("val1")},
Password: influxdb.SecretField{Key: id1.String() + "password-key", Value: strPtr("val2")},
Username: influxdb.SecretField{Key: id1.String() + "username-key", Value: pointer.String("val1")},
Password: influxdb.SecretField{Key: id1.String() + "password-key", Value: pointer.String("val2")},
}
var endpoint2 = endpoint.HTTP{
Base: endpoint.Base{
@ -97,8 +100,8 @@ func TestEndpointService_cummulativeSecrets(t *testing.T) {
AuthMethod: "basic",
Method: "POST",
URL: "http://example2.com",
Username: influxdb.SecretField{Key: id2.String() + "username-key", Value: strPtr("val3")},
Password: influxdb.SecretField{Key: id2.String() + "password-key", Value: strPtr("val4")},
Username: influxdb.SecretField{Key: id2.String() + "username-key", Value: pointer.String("val3")},
Password: influxdb.SecretField{Key: id2.String() + "password-key", Value: pointer.String("val4")},
}
var err error
var secretKeys []string
@ -127,7 +130,7 @@ func TestEndpointService_cummulativeSecrets(t *testing.T) {
// update 1st endpoint and validate secrets
const updatedSecretValue = "updatedSecVal"
endpoint1.Username.Value = strPtr(updatedSecretValue)
endpoint1.Username.Value = pointer.String(updatedSecretValue)
if _, err = endpointService.UpdateNotificationEndpoint(ctx, *endpoint1.ID, &endpoint1, *userID); err != nil {
t.Fatal(err)
}
@ -175,8 +178,3 @@ func TestEndpointService_cummulativeSecrets(t *testing.T) {
t.Errorf("secrets after deleting the 2nd endpoint = %v, want %v", len(secretKeys), 2)
}
}
// strPtr returns string pointer
func strPtr(s string) *string {
return &s
}

View File

@ -0,0 +1,305 @@
package service
import (
"context"
influxdb "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/notification/endpoint"
"github.com/influxdata/influxdb/v2/snowflake"
)
var (
// ErrNotificationEndpointNotFound is used when the notification endpoint is not found.
ErrNotificationEndpointNotFound = &influxdb.Error{
Msg: "notification endpoint not found",
Code: influxdb.ENotFound,
}
notificationEndpointBucket = []byte("notificationEndpointv1")
notificationEndpointIndexBucket = []byte("notificationEndpointIndexv1")
)
var _ influxdb.NotificationEndpointService = (*Store)(nil)
func newEndpointStore() *kv.IndexStore {
const resource = "notification endpoint"
var decEndpointEntFn kv.DecodeBucketValFn = func(key, val []byte) ([]byte, interface{}, error) {
edp, err := endpoint.UnmarshalJSON(val)
return key, edp, err
}
var decValToEntFn kv.ConvertValToEntFn = func(_ []byte, v interface{}) (kv.Entity, error) {
edp, ok := v.(influxdb.NotificationEndpoint)
if err := kv.IsErrUnexpectedDecodeVal(ok); err != nil {
return kv.Entity{}, err
}
return kv.Entity{
PK: kv.EncID(edp.GetID()),
UniqueKey: kv.Encode(kv.EncID(edp.GetOrgID()), kv.EncString(edp.GetName())),
Body: edp,
}, nil
}
return &kv.IndexStore{
Resource: resource,
EntStore: kv.NewStoreBase(resource, notificationEndpointBucket, kv.EncIDKey, kv.EncBodyJSON, decEndpointEntFn, decValToEntFn),
IndexStore: kv.NewOrgNameKeyStore(resource, notificationEndpointIndexBucket, true),
}
}
type Store struct {
kv kv.Store
endpointStore *kv.IndexStore
IDGenerator influxdb.IDGenerator
TimeGenerator influxdb.TimeGenerator
}
func NewStore(store kv.Store) *Store {
return &Store{
kv: store,
endpointStore: newEndpointStore(),
IDGenerator: snowflake.NewDefaultIDGenerator(),
TimeGenerator: influxdb.RealTimeGenerator{},
}
}
// CreateNotificationEndpoint creates a new notification endpoint and sets b.ID with the new identifier.
func (s *Store) CreateNotificationEndpoint(ctx context.Context, edp influxdb.NotificationEndpoint, userID influxdb.ID) error {
return s.kv.Update(ctx, func(tx kv.Tx) error {
return s.createNotificationEndpoint(ctx, tx, edp, userID)
})
}
func (s *Store) createNotificationEndpoint(ctx context.Context, tx kv.Tx, edp influxdb.NotificationEndpoint, userID influxdb.ID) error {
id := s.IDGenerator.ID()
edp.SetID(id)
now := s.TimeGenerator.Now()
edp.SetCreatedAt(now)
edp.SetUpdatedAt(now)
edp.BackfillSecretKeys()
if err := edp.Valid(); err != nil {
return err
}
ent := kv.Entity{
PK: kv.EncID(edp.GetID()),
UniqueKey: kv.Encode(kv.EncID(edp.GetOrgID()), kv.EncString(edp.GetName())),
Body: edp,
}
if err := s.endpointStore.Put(ctx, tx, ent, kv.PutNew()); err != nil {
return err
}
return nil
}
// UpdateNotificationEndpoint updates a single notification endpoint.
// Returns the new notification endpoint after update.
func (s *Store) UpdateNotificationEndpoint(ctx context.Context, id influxdb.ID, edp influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
var err error
err = s.kv.Update(ctx, func(tx kv.Tx) error {
edp, err = s.updateNotificationEndpoint(ctx, tx, id, edp, userID)
return err
})
return edp, err
}
func (s *Store) updateNotificationEndpoint(ctx context.Context, tx kv.Tx, id influxdb.ID, edp influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
current, err := s.findNotificationEndpointByID(ctx, tx, id)
if err != nil {
return nil, err
}
// ID and OrganizationID can not be updated
edp.SetCreatedAt(current.GetCRUDLog().CreatedAt)
edp.SetUpdatedAt(s.TimeGenerator.Now())
if err := edp.Valid(); err != nil {
return nil, err
}
ent := kv.Entity{
PK: kv.EncID(edp.GetID()),
UniqueKey: kv.Encode(kv.EncID(edp.GetOrgID()), kv.EncString(edp.GetName())),
Body: edp,
}
if err := s.endpointStore.Put(ctx, tx, ent, kv.PutUpdate()); err != nil {
return nil, err
}
return edp, nil
}
// PatchNotificationEndpoint updates a single notification endpoint with changeset.
// Returns the new notification endpoint state after update.
func (s *Store) PatchNotificationEndpoint(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error) {
var edp influxdb.NotificationEndpoint
if err := s.kv.Update(ctx, func(tx kv.Tx) (err error) {
edp, err = s.patchNotificationEndpoint(ctx, tx, id, upd)
if err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
return edp, nil
}
func (s *Store) patchNotificationEndpoint(ctx context.Context, tx kv.Tx, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error) {
edp, err := s.findNotificationEndpointByID(ctx, tx, id)
if err != nil {
return nil, err
}
if upd.Name != nil {
edp.SetName(*upd.Name)
}
if upd.Description != nil {
edp.SetDescription(*upd.Description)
}
if upd.Status != nil {
edp.SetStatus(*upd.Status)
}
edp.SetUpdatedAt(s.TimeGenerator.Now())
if err := edp.Valid(); err != nil {
return nil, err
}
// TODO(jsteenb2): every above here moves into service layer
ent := kv.Entity{
PK: kv.EncID(edp.GetID()),
UniqueKey: kv.Encode(kv.EncID(edp.GetOrgID()), kv.EncString(edp.GetName())),
Body: edp,
}
if err := s.endpointStore.Put(ctx, tx, ent, kv.PutUpdate()); err != nil {
return nil, err
}
return edp, nil
}
// PutNotificationEndpoint put a notification endpoint to storage.
func (s *Store) PutNotificationEndpoint(ctx context.Context, edp influxdb.NotificationEndpoint) error {
// TODO(jsteenb2): all the stuffs before the update should be moved up into the
// service layer as well as all the id/time setting items
if err := edp.Valid(); err != nil {
return err
}
return s.kv.Update(ctx, func(tx kv.Tx) (err error) {
ent := kv.Entity{
PK: kv.EncID(edp.GetID()),
UniqueKey: kv.Encode(kv.EncID(edp.GetOrgID()), kv.EncString(edp.GetName())),
Body: edp,
}
return s.endpointStore.Put(ctx, tx, ent)
})
}
// FindNotificationEndpointByID returns a single notification endpoint by ID.
func (s *Store) FindNotificationEndpointByID(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
var (
edp influxdb.NotificationEndpoint
err error
)
err = s.kv.View(ctx, func(tx kv.Tx) error {
edp, err = s.findNotificationEndpointByID(ctx, tx, id)
return err
})
return edp, err
}
func (s *Store) findNotificationEndpointByID(ctx context.Context, tx kv.Tx, id influxdb.ID) (influxdb.NotificationEndpoint, error) {
decodedEnt, err := s.endpointStore.FindEnt(ctx, tx, kv.Entity{PK: kv.EncID(id)})
if err != nil {
return nil, err
}
edp, ok := decodedEnt.(influxdb.NotificationEndpoint)
return edp, kv.IsErrUnexpectedDecodeVal(ok)
}
// FindNotificationEndpoints returns a list of notification endpoints that match isNext and the total count of matching notification endpoints.
// Additional options provide pagination & sorting.
func (s *Store) FindNotificationEndpoints(ctx context.Context, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) (edps []influxdb.NotificationEndpoint, n int, err error) {
err = s.kv.View(ctx, func(tx kv.Tx) error {
edps, n, err = s.findNotificationEndpoints(ctx, tx, filter, opt...)
return err
})
return edps, n, err
}
func (s *Store) findNotificationEndpoints(ctx context.Context, tx kv.Tx, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error) {
var o influxdb.FindOptions
if len(opt) > 0 {
o = opt[0]
}
edps := make([]influxdb.NotificationEndpoint, 0)
err := s.endpointStore.Find(ctx, tx, kv.FindOpts{
Descending: o.Descending,
Offset: o.Offset,
Limit: o.Limit,
FilterEntFn: filterEndpointsFn(filter),
CaptureFn: func(k []byte, v interface{}) error {
edp, ok := v.(influxdb.NotificationEndpoint)
if err := kv.IsErrUnexpectedDecodeVal(ok); err != nil {
return err
}
edps = append(edps, edp)
return nil
},
})
if err != nil {
return nil, 0, err
}
return edps, len(edps), err
}
func filterEndpointsFn(filter influxdb.NotificationEndpointFilter) func([]byte, interface{}) bool {
return func(key []byte, val interface{}) bool {
edp := val.(influxdb.NotificationEndpoint)
if filter.ID != nil && edp.GetID() != *filter.ID {
return false
}
if filter.OrgID != nil && edp.GetOrgID() != *filter.OrgID {
return false
}
return true
}
}
// DeleteNotificationEndpoint removes a notification endpoint by ID.
func (s *Store) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) (flds []influxdb.SecretField, orgID influxdb.ID, err error) {
err = s.kv.Update(ctx, func(tx kv.Tx) error {
flds, orgID, err = s.deleteNotificationEndpoint(ctx, tx, id)
return err
})
return flds, orgID, err
}
func (s *Store) deleteNotificationEndpoint(ctx context.Context, tx kv.Tx, id influxdb.ID) (flds []influxdb.SecretField, orgID influxdb.ID, err error) {
edp, err := s.findNotificationEndpointByID(ctx, tx, id)
if err != nil {
return nil, 0, err
}
if err := s.endpointStore.DeleteEnt(ctx, tx, kv.Entity{PK: kv.EncID(id)}); err != nil {
return nil, 0, err
}
return edp.SecretFields(), edp.GetOrgID(), nil
}

View File

@ -0,0 +1,127 @@
package service_test
import (
"context"
"io/ioutil"
"os"
"testing"
influxdb "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kit/errors"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/notification/endpoint/service"
endpointsTesting "github.com/influxdata/influxdb/v2/notification/endpoint/service/testing"
"github.com/influxdata/influxdb/v2/tenant"
"go.uber.org/zap/zaptest"
)
func TestNotificationEndpointService_WithInmem(t *testing.T) {
endpointsTesting.NotificationEndpointService(initInmemNotificationEndpointService, t)
}
func TestNotificationEndpointService_WithBolt(t *testing.T) {
endpointsTesting.NotificationEndpointService(initBoltNotificationEndpointService, t)
}
func NewTestBoltStore(t *testing.T) (kv.SchemaStore, func(), error) {
f, err := ioutil.TempFile("", "influxdata-bolt-")
if err != nil {
return nil, nil, errors.New("unable to open temporary boltdb file")
}
f.Close()
ctx := context.Background()
logger := zaptest.NewLogger(t)
path := f.Name()
// skip fsync to improve test performance
s := bolt.NewKVStore(logger, path, bolt.WithNoSync)
if err := s.Open(context.Background()); err != nil {
return nil, nil, err
}
if err := all.Up(ctx, logger, s); err != nil {
return nil, nil, err
}
close := func() {
s.Close()
os.Remove(path)
}
return s, close, nil
}
func initBoltNotificationEndpointService(f endpointsTesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()) {
store, closeStore, err := NewTestBoltStore(t)
if err != nil {
t.Fatal(err)
}
svc, secretSVC, closeSvc := initNotificationEndpointService(store, f, t)
return svc, secretSVC, func() {
closeSvc()
closeStore()
}
}
func initInmemNotificationEndpointService(f endpointsTesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()) {
store := inmem.NewKVStore()
if err := all.Up(context.Background(), zaptest.NewLogger(t), store); err != nil {
t.Fatal(err)
}
svc, secretSVC, closeSvc := initNotificationEndpointService(store, f, t)
return svc, secretSVC, closeSvc
}
func initNotificationEndpointService(s kv.SchemaStore, f endpointsTesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()) {
ctx := context.Background()
logger := zaptest.NewLogger(t)
tenantStore := tenant.NewStore(s)
if f.IDGenerator != nil {
tenantStore.OrgIDGen = f.IDGenerator
tenantStore.IDGen = f.IDGenerator
}
tenantSvc := tenant.NewService(tenantStore)
secretSvc := kv.NewService(logger, s)
store := service.NewStore(s)
store.IDGenerator = f.IDGenerator
if f.TimeGenerator != nil {
store.TimeGenerator = f.TimeGenerator
}
endpointSvc := service.New(store, secretSvc)
for _, edp := range f.NotificationEndpoints {
if err := store.PutNotificationEndpoint(ctx, edp); err != nil {
t.Fatalf("failed to populate notification endpoint: %v", err)
}
}
for _, o := range f.Orgs {
if err := tenantSvc.CreateOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate org: %v", err)
}
}
return endpointSvc, secretSvc, func() {
for _, edp := range f.NotificationEndpoints {
if _, _, err := endpointSvc.DeleteNotificationEndpoint(ctx, edp.GetID()); err != nil && err != service.ErrNotificationEndpointNotFound {
t.Logf("failed to remove notification endpoint: %v", err)
}
}
for _, o := range f.Orgs {
if err := tenantSvc.DeleteOrganization(ctx, o.ID); err != nil {
t.Fatalf("failed to remove org: %v", err)
}
}
}
}

View File

@ -44,8 +44,8 @@ type RuleService struct {
timeGenerator influxdb.TimeGenerator
}
// NewRuleService constructs and configures a notification rule service
func NewRuleService(logger *zap.Logger, store kv.Store, tasks influxdb.TaskService, orgs influxdb.OrganizationService, endpoints influxdb.NotificationEndpointService) (*RuleService, error) {
// New constructs and configures a notification rule service
func New(logger *zap.Logger, store kv.Store, tasks influxdb.TaskService, orgs influxdb.OrganizationService, endpoints influxdb.NotificationEndpointService) (*RuleService, error) {
s := &RuleService{
log: logger,
kv: store,

View File

@ -13,6 +13,7 @@ import (
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/mock"
endpointservice "github.com/influxdata/influxdb/v2/notification/endpoint/service"
_ "github.com/influxdata/influxdb/v2/query/builtin"
"github.com/influxdata/influxdb/v2/query/fluxlang"
"github.com/influxdata/influxdb/v2/tenant"
@ -68,7 +69,12 @@ func initNotificationRuleStore(s kv.Store, f NotificationRuleFields, t *testing.
tenantSvc = tenant.NewService(tenantStore)
)
svc, err := NewRuleService(logger, s, kvsvc, tenantSvc, kvsvc)
endpStore := endpointservice.NewStore(s)
endpStore.IDGenerator = f.IDGenerator
endpStore.TimeGenerator = f.TimeGenerator
endp := endpointservice.New(endpStore, kvsvc)
svc, err := New(logger, s, kvsvc, tenantSvc, endp)
if err != nil {
t.Fatal(err)
}
@ -88,7 +94,7 @@ func initNotificationRuleStore(s kv.Store, f NotificationRuleFields, t *testing.
}
for _, e := range f.Endpoints {
if err := kvsvc.CreateNotificationEndpoint(ctx, e, 1); err != nil {
if err := endp.CreateNotificationEndpoint(ctx, e, 1); err != nil {
t.Fatalf("failed to populate notification endpoint: %v", err)
}
}

View File

@ -104,12 +104,6 @@ func (n *NotificationEndpointUpdate) Valid() error {
// NotificationEndpointService represents a service for managing notification endpoints.
type NotificationEndpointService interface {
// UserResourceMappingService must be part of all NotificationEndpointStore service,
// for create, delete.
UserResourceMappingService
// OrganizationService is needed for search filter
OrganizationService
// FindNotificationEndpointByID returns a single notification endpoint by ID.
FindNotificationEndpointByID(ctx context.Context, id ID) (NotificationEndpoint, error)

File diff suppressed because it is too large Load Diff

View File

@ -2,10 +2,12 @@ package testing
import (
"context"
"strings"
"testing"
platform "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TODO(goller): remove opPrefix argument
@ -29,14 +31,14 @@ func ErrorsEqual(t *testing.T, actual, expected error) {
t.Errorf("expected error %s but received nil", expected.Error())
}
if platform.ErrorCode(expected) != platform.ErrorCode(actual) {
if influxdb.ErrorCode(expected) != influxdb.ErrorCode(actual) {
t.Logf("\nexpected: %v\nactual: %v\n\n", expected, actual)
t.Errorf("expected error code %q but received %q", platform.ErrorCode(expected), platform.ErrorCode(actual))
t.Errorf("expected error code %q but received %q", influxdb.ErrorCode(expected), influxdb.ErrorCode(actual))
}
if platform.ErrorMessage(expected) != platform.ErrorMessage(actual) {
if influxdb.ErrorMessage(expected) != influxdb.ErrorMessage(actual) {
t.Logf("\nexpected: %v\nactual: %v\n\n", expected, actual)
t.Errorf("expected error message %q but received %q", platform.ErrorMessage(expected), platform.ErrorMessage(actual))
t.Errorf("expected error message %q but received %q", influxdb.ErrorMessage(expected), influxdb.ErrorMessage(actual))
}
}
@ -47,13 +49,13 @@ func FloatPtr(f float64) *float64 {
return p
}
func idPtr(id platform.ID) *platform.ID {
func idPtr(id influxdb.ID) *influxdb.ID {
return &id
}
// MustIDBase16 is an helper to ensure a correct ID is built during testing.
func MustIDBase16(s string) platform.ID {
id, err := platform.IDFromString(s)
func MustIDBase16(s string) influxdb.ID {
id, err := influxdb.IDFromString(s)
if err != nil {
panic(err)
}
@ -61,12 +63,12 @@ func MustIDBase16(s string) platform.ID {
}
// MustIDBase16Ptr is an helper to ensure a correct ID ptr ref is built during testing.
func MustIDBase16Ptr(s string) *platform.ID {
func MustIDBase16Ptr(s string) *influxdb.ID {
id := MustIDBase16(s)
return &id
}
func MustCreateOrgs(ctx context.Context, svc *kv.Service, os ...*platform.Organization) {
func MustCreateOrgs(ctx context.Context, svc influxdb.OrganizationService, os ...*influxdb.Organization) {
for _, o := range os {
if err := svc.CreateOrganization(ctx, o); err != nil {
panic(err)
@ -74,7 +76,7 @@ func MustCreateOrgs(ctx context.Context, svc *kv.Service, os ...*platform.Organi
}
}
func MustCreateLabels(ctx context.Context, svc *kv.Service, labels ...*platform.Label) {
func MustCreateLabels(ctx context.Context, svc influxdb.LabelService, labels ...*influxdb.Label) {
for _, l := range labels {
if err := svc.CreateLabel(ctx, l); err != nil {
panic(err)
@ -82,7 +84,7 @@ func MustCreateLabels(ctx context.Context, svc *kv.Service, labels ...*platform.
}
}
func MustCreateUsers(ctx context.Context, svc *kv.Service, us ...*platform.User) {
func MustCreateUsers(ctx context.Context, svc influxdb.UserService, us ...*influxdb.User) {
for _, u := range us {
if err := svc.CreateUser(ctx, u); err != nil {
panic(err)
@ -90,7 +92,7 @@ func MustCreateUsers(ctx context.Context, svc *kv.Service, us ...*platform.User)
}
}
func MustCreateMappings(ctx context.Context, svc *kv.Service, ms ...*platform.UserResourceMapping) {
func MustCreateMappings(ctx context.Context, svc influxdb.UserResourceMappingService, ms ...*influxdb.UserResourceMapping) {
for _, m := range ms {
if err := svc.CreateUserResourceMapping(ctx, m); err != nil {
panic(err)
@ -98,36 +100,57 @@ func MustCreateMappings(ctx context.Context, svc *kv.Service, ms ...*platform.Us
}
}
func MustMakeUsersOrgOwner(ctx context.Context, svc *kv.Service, oid platform.ID, uids ...platform.ID) {
ms := make([]*platform.UserResourceMapping, len(uids))
func MustMakeUsersOrgOwner(ctx context.Context, svc influxdb.UserResourceMappingService, oid influxdb.ID, uids ...influxdb.ID) {
ms := make([]*influxdb.UserResourceMapping, len(uids))
for i, uid := range uids {
ms[i] = &platform.UserResourceMapping{
ms[i] = &influxdb.UserResourceMapping{
UserID: uid,
UserType: platform.Owner,
ResourceType: platform.OrgsResourceType,
UserType: influxdb.Owner,
ResourceType: influxdb.OrgsResourceType,
ResourceID: oid,
}
}
MustCreateMappings(ctx, svc, ms...)
}
func MustMakeUsersOrgMember(ctx context.Context, svc *kv.Service, oid platform.ID, uids ...platform.ID) {
ms := make([]*platform.UserResourceMapping, len(uids))
func MustMakeUsersOrgMember(ctx context.Context, svc influxdb.UserResourceMappingService, oid influxdb.ID, uids ...influxdb.ID) {
ms := make([]*influxdb.UserResourceMapping, len(uids))
for i, uid := range uids {
ms[i] = &platform.UserResourceMapping{
ms[i] = &influxdb.UserResourceMapping{
UserID: uid,
UserType: platform.Member,
ResourceType: platform.OrgsResourceType,
UserType: influxdb.Member,
ResourceType: influxdb.OrgsResourceType,
ResourceID: oid,
}
}
MustCreateMappings(ctx, svc, ms...)
}
func MustNewPermissionAtID(id platform.ID, a platform.Action, rt platform.ResourceType, orgID platform.ID) *platform.Permission {
perm, err := platform.NewPermissionAtID(id, a, rt, orgID)
func MustNewPermissionAtID(id influxdb.ID, a influxdb.Action, rt influxdb.ResourceType, orgID influxdb.ID) *influxdb.Permission {
perm, err := influxdb.NewPermissionAtID(id, a, rt, orgID)
if err != nil {
panic(err)
}
return perm
}
func influxErrsEqual(t *testing.T, expected *influxdb.Error, actual error) {
t.Helper()
if expected != nil {
require.Error(t, actual)
}
if actual == nil {
return
}
if expected == nil {
require.NoError(t, actual)
return
}
iErr, ok := actual.(*influxdb.Error)
require.True(t, ok)
assert.Equal(t, expected.Code, iErr.Code)
assert.Truef(t, strings.HasPrefix(iErr.Error(), expected.Error()), "expected: %s got err: %s", expected.Error(), actual.Error())
}