refactor: move checks service to new package (#18671)
parent
a6a5d0d43f
commit
b35479640a
|
@ -0,0 +1,566 @@
|
|||
package checks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kit/tracing"
|
||||
"github.com/influxdata/influxdb/v2/kv"
|
||||
"github.com/influxdata/influxdb/v2/notification/check"
|
||||
"github.com/influxdata/influxdb/v2/query/fluxlang"
|
||||
"github.com/influxdata/influxdb/v2/snowflake"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var _ influxdb.CheckService = (*Service)(nil)
|
||||
|
||||
// Service is a check service
|
||||
// It provides all the operations needed to manage checks
|
||||
type Service struct {
|
||||
kv kv.Store
|
||||
|
||||
log *zap.Logger
|
||||
|
||||
orgs influxdb.OrganizationService
|
||||
tasks influxdb.TaskService
|
||||
|
||||
timeGenerator influxdb.TimeGenerator
|
||||
idGenerator influxdb.IDGenerator
|
||||
|
||||
checkStore *kv.IndexStore
|
||||
}
|
||||
|
||||
// NewService constructs and configures a new checks.Service
|
||||
func NewService(logger *zap.Logger, store kv.Store, orgs influxdb.OrganizationService, tasks influxdb.TaskService) *Service {
|
||||
return &Service{
|
||||
kv: store,
|
||||
log: logger,
|
||||
orgs: orgs,
|
||||
tasks: tasks,
|
||||
|
||||
timeGenerator: influxdb.RealTimeGenerator{},
|
||||
idGenerator: snowflake.NewIDGenerator(),
|
||||
checkStore: newCheckStore(),
|
||||
}
|
||||
}
|
||||
|
||||
func newCheckStore() *kv.IndexStore {
|
||||
const resource = "check"
|
||||
|
||||
var decEndpointEntFn kv.DecodeBucketValFn = func(key, val []byte) ([]byte, interface{}, error) {
|
||||
ch, err := check.UnmarshalJSON(val)
|
||||
return key, ch, err
|
||||
}
|
||||
|
||||
var decValToEntFn kv.ConvertValToEntFn = func(_ []byte, v interface{}) (kv.Entity, error) {
|
||||
ch, ok := v.(influxdb.Check)
|
||||
if err := kv.IsErrUnexpectedDecodeVal(ok); err != nil {
|
||||
return kv.Entity{}, err
|
||||
}
|
||||
return kv.Entity{
|
||||
PK: kv.EncID(ch.GetID()),
|
||||
UniqueKey: kv.Encode(
|
||||
kv.EncID(ch.GetOrgID()),
|
||||
kv.EncString(ch.GetName()),
|
||||
),
|
||||
Body: ch,
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &kv.IndexStore{
|
||||
Resource: resource,
|
||||
EntStore: kv.NewStoreBase(
|
||||
resource,
|
||||
[]byte("checksv1"),
|
||||
kv.EncIDKey,
|
||||
kv.EncBodyJSON,
|
||||
decEndpointEntFn,
|
||||
decValToEntFn,
|
||||
),
|
||||
IndexStore: kv.NewOrgNameKeyStore(resource, []byte("checkindexv1"), false),
|
||||
}
|
||||
}
|
||||
|
||||
// FindCheckByID retrieves a check by id.
|
||||
func (s *Service) FindCheckByID(ctx context.Context, id influxdb.ID) (influxdb.Check, error) {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
var c influxdb.Check
|
||||
err := s.kv.View(ctx, func(tx kv.Tx) error {
|
||||
chkVal, err := s.findCheckByID(ctx, tx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c = chkVal
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (s *Service) findCheckByID(ctx context.Context, tx kv.Tx, id influxdb.ID) (influxdb.Check, error) {
|
||||
chkVal, err := s.checkStore.FindEnt(ctx, tx, kv.Entity{PK: kv.EncID(id)})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return chkVal.(influxdb.Check), nil
|
||||
}
|
||||
|
||||
func (s *Service) findCheckByName(ctx context.Context, tx kv.Tx, orgID influxdb.ID, name string) (influxdb.Check, error) {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
chVal, err := s.checkStore.FindEnt(ctx, tx, kv.Entity{
|
||||
UniqueKey: kv.Encode(kv.EncID(orgID), kv.EncString(name)),
|
||||
})
|
||||
if kv.IsNotFound(err) {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.ENotFound,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return chVal.(influxdb.Check), nil
|
||||
}
|
||||
|
||||
// FindCheck retrives a check using an arbitrary check filter.
|
||||
// Filters using ID, or OrganizationID and check Name should be efficient.
|
||||
// Other filters will do a linear scan across checks until it finds a match.
|
||||
func (s *Service) FindCheck(ctx context.Context, filter influxdb.CheckFilter) (influxdb.Check, error) {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
if filter.ID != nil {
|
||||
return s.FindCheckByID(ctx, *filter.ID)
|
||||
}
|
||||
|
||||
if filter.Org != nil {
|
||||
o, err := s.orgs.FindOrganization(ctx, influxdb.OrganizationFilter{Name: filter.Org})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
filter.OrgID = &o.ID
|
||||
}
|
||||
|
||||
var c influxdb.Check
|
||||
err := s.kv.View(ctx, func(tx kv.Tx) error {
|
||||
if filter.OrgID != nil && filter.Name != nil {
|
||||
ch, err := s.findCheckByName(ctx, tx, *filter.OrgID, *filter.Name)
|
||||
c = ch
|
||||
return err
|
||||
}
|
||||
|
||||
var prefix []byte
|
||||
if filter.OrgID != nil {
|
||||
ent := kv.Entity{UniqueKey: kv.EncID(*filter.OrgID)}
|
||||
prefix, _ = s.checkStore.IndexStore.EntKey(ctx, ent)
|
||||
}
|
||||
|
||||
filterFn := filterChecksFn(filter)
|
||||
return s.checkStore.Find(ctx, tx, kv.FindOpts{
|
||||
Prefix: prefix,
|
||||
Limit: 1,
|
||||
FilterEntFn: func(k []byte, v interface{}) bool {
|
||||
ch, ok := v.(influxdb.Check)
|
||||
if err := kv.IsErrUnexpectedDecodeVal(ok); err != nil {
|
||||
return false
|
||||
}
|
||||
return filterFn(ch)
|
||||
},
|
||||
CaptureFn: func(key []byte, decodedVal interface{}) error {
|
||||
c, _ = decodedVal.(influxdb.Check)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if c == nil {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.ENotFound,
|
||||
Msg: "check not found",
|
||||
}
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func filterChecksFn(filter influxdb.CheckFilter) func(c influxdb.Check) bool {
|
||||
return func(c influxdb.Check) bool {
|
||||
if filter.ID != nil && c.GetID() != *filter.ID {
|
||||
return false
|
||||
}
|
||||
if filter.OrgID != nil && c.GetOrgID() != *filter.OrgID {
|
||||
return false
|
||||
}
|
||||
if filter.Name != nil && c.GetName() != *filter.Name {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// FindChecks retrieves all checks that match an arbitrary check filter.
|
||||
// Filters using ID, or OrganizationID and check Name should be efficient.
|
||||
// Other filters will do a linear scan across all checks searching for a match.
|
||||
func (s *Service) FindChecks(ctx context.Context, filter influxdb.CheckFilter, opts ...influxdb.FindOptions) ([]influxdb.Check, int, error) {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
if filter.ID != nil {
|
||||
c, err := s.FindCheckByID(ctx, *filter.ID)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return []influxdb.Check{c}, 1, nil
|
||||
}
|
||||
|
||||
if filter.Org != nil {
|
||||
o, err := s.orgs.FindOrganization(ctx, influxdb.OrganizationFilter{Name: filter.Org})
|
||||
if err != nil {
|
||||
return nil, 0, &influxdb.Error{Err: err}
|
||||
}
|
||||
|
||||
filter.OrgID = &o.ID
|
||||
}
|
||||
|
||||
var checks []influxdb.Check
|
||||
err := s.kv.View(ctx, func(tx kv.Tx) error {
|
||||
var opt influxdb.FindOptions
|
||||
if len(opts) > 0 {
|
||||
opt = opts[0]
|
||||
}
|
||||
|
||||
filterFn := filterChecksFn(filter)
|
||||
return s.checkStore.Find(ctx, tx, kv.FindOpts{
|
||||
Descending: opt.Descending,
|
||||
Offset: opt.Offset,
|
||||
Limit: opt.Limit,
|
||||
FilterEntFn: func(k []byte, v interface{}) bool {
|
||||
ch, ok := v.(influxdb.Check)
|
||||
if err := kv.IsErrUnexpectedDecodeVal(ok); err != nil {
|
||||
return false
|
||||
}
|
||||
return filterFn(ch)
|
||||
},
|
||||
CaptureFn: func(key []byte, decodedVal interface{}) error {
|
||||
c, ok := decodedVal.(influxdb.Check)
|
||||
if err := kv.IsErrUnexpectedDecodeVal(ok); err != nil {
|
||||
return err
|
||||
}
|
||||
checks = append(checks, c)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
return checks, len(checks), nil
|
||||
}
|
||||
|
||||
// CreateCheck creates a influxdb check and sets ID.
|
||||
func (s *Service) CreateCheck(ctx context.Context, c influxdb.CheckCreate, userID influxdb.ID) (err error) {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
if err := c.Status.Valid(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if c.GetOrgID().Valid() {
|
||||
if _, err := s.orgs.FindOrganizationByID(ctx, c.GetOrgID()); err != nil {
|
||||
return &influxdb.Error{
|
||||
Code: influxdb.ENotFound,
|
||||
Op: influxdb.OpCreateCheck,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c.SetID(s.idGenerator.ID())
|
||||
c.SetOwnerID(userID)
|
||||
now := s.timeGenerator.Now()
|
||||
c.SetCreatedAt(now)
|
||||
c.SetUpdatedAt(now)
|
||||
|
||||
if err := c.Valid(fluxlang.DefaultService); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// create task initially in inactive state
|
||||
t, err := s.createCheckTask(ctx, c)
|
||||
if err != nil {
|
||||
return &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: "Could not create task from check",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
c.SetTaskID(t.ID)
|
||||
|
||||
err = s.kv.Update(ctx, func(tx kv.Tx) error {
|
||||
return s.putCheck(ctx, tx, c, kv.PutNew())
|
||||
})
|
||||
|
||||
// something went wrong persisting new check
|
||||
// so remove associated task
|
||||
if err != nil {
|
||||
if derr := s.tasks.DeleteTask(ctx, t.ID); derr != nil {
|
||||
msg := fmt.Sprintf("error removing task %q for check %q in org %q", t.ID, c.GetName(), c.GetOrgID())
|
||||
s.log.Error(msg, zap.Error(derr))
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// update task to be in matching state to check
|
||||
if influxdb.Status(t.Status) != c.Status {
|
||||
_, err = s.tasks.UpdateTask(ctx, t.ID, influxdb.TaskUpdate{
|
||||
Status: strPtr(string(c.Status)),
|
||||
})
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Service) createCheckTask(ctx context.Context, c influxdb.CheckCreate) (*influxdb.Task, error) {
|
||||
script, err := c.GenerateFlux(fluxlang.DefaultService)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tc := influxdb.TaskCreate{
|
||||
Type: c.Type(),
|
||||
Flux: script,
|
||||
OwnerID: c.GetOwnerID(),
|
||||
OrganizationID: c.GetOrgID(),
|
||||
// task initially in inactive state to ensure it isn't
|
||||
// scheduled until check is persisted and active
|
||||
Status: string(influxdb.Inactive),
|
||||
}
|
||||
|
||||
t, err := s.tasks.CreateTask(ctx, tc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// PutCheck will put a check without setting an ID.
|
||||
func (s *Service) PutCheck(ctx context.Context, c influxdb.Check) error {
|
||||
if err := c.Valid(fluxlang.DefaultService); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.kv.Update(ctx, func(tx kv.Tx) error {
|
||||
return s.putCheck(ctx, tx, c)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Service) putCheck(ctx context.Context, tx kv.Tx, c influxdb.Check, opts ...kv.PutOptionFn) error {
|
||||
return s.checkStore.Put(ctx, tx, kv.Entity{
|
||||
PK: kv.EncID(c.GetID()),
|
||||
UniqueKey: kv.Encode(kv.EncID(c.GetOrgID()), kv.EncString(c.GetName())),
|
||||
Body: c,
|
||||
}, opts...)
|
||||
}
|
||||
|
||||
// PatchCheck updates a check according the parameters set on upd.
|
||||
func (s *Service) PatchCheck(ctx context.Context, id influxdb.ID, upd influxdb.CheckUpdate) (influxdb.Check, error) {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
var check influxdb.Check
|
||||
if err := s.kv.Update(ctx, func(tx kv.Tx) error {
|
||||
c, err := s.findCheckByID(ctx, tx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c, err = s.patchCheck(ctx, tx, c, upd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
check = c
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := s.patchCheckTask(ctx, check.GetTaskID(), upd); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return check, nil
|
||||
}
|
||||
|
||||
// UpdateCheck updates the check.
|
||||
func (s *Service) UpdateCheck(ctx context.Context, id influxdb.ID, chk influxdb.CheckCreate) (influxdb.Check, error) {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
var check influxdb.Check
|
||||
if err := s.kv.Update(ctx, func(tx kv.Tx) error {
|
||||
c, err := s.updateCheck(ctx, tx, id, chk)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
check = c
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := s.updateCheckTask(ctx, chk); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return check, nil
|
||||
}
|
||||
|
||||
func (s *Service) updateCheckTask(ctx context.Context, chk influxdb.CheckCreate) error {
|
||||
flux, err := chk.GenerateFlux(fluxlang.DefaultService)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tu := influxdb.TaskUpdate{
|
||||
Flux: &flux,
|
||||
Description: strPtr(chk.GetDescription()),
|
||||
}
|
||||
|
||||
if chk.Status != "" {
|
||||
tu.Status = strPtr(string(chk.Status))
|
||||
}
|
||||
|
||||
if _, err := s.tasks.UpdateTask(ctx, chk.GetTaskID(), tu); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Service) patchCheckTask(ctx context.Context, taskID influxdb.ID, upd influxdb.CheckUpdate) error {
|
||||
tu := influxdb.TaskUpdate{
|
||||
Description: upd.Description,
|
||||
}
|
||||
|
||||
if upd.Status != nil {
|
||||
tu.Status = strPtr(string(*upd.Status))
|
||||
}
|
||||
|
||||
if _, err := s.tasks.UpdateTask(ctx, taskID, tu); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) updateCheck(ctx context.Context, tx kv.Tx, id influxdb.ID, chk influxdb.CheckCreate) (influxdb.Check, error) {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
current, err := s.findCheckByID(ctx, tx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
chk.SetTaskID(current.GetTaskID())
|
||||
|
||||
if chk.GetName() != current.GetName() {
|
||||
c0, err := s.findCheckByName(ctx, tx, current.GetOrgID(), chk.GetName())
|
||||
if err == nil && c0.GetID() != id {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EConflict,
|
||||
Msg: "check name is not unique",
|
||||
}
|
||||
}
|
||||
|
||||
ent := kv.Entity{
|
||||
UniqueKey: kv.Encode(kv.EncID(current.GetOrgID()), kv.EncString(current.GetName())),
|
||||
}
|
||||
if err := s.checkStore.IndexStore.DeleteEnt(ctx, tx, ent); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// ID and OrganizationID can not be updated
|
||||
chk.SetID(current.GetID())
|
||||
chk.SetOrgID(current.GetOrgID())
|
||||
chk.SetOwnerID(current.GetOwnerID())
|
||||
chk.SetCreatedAt(current.GetCRUDLog().CreatedAt)
|
||||
chk.SetUpdatedAt(s.timeGenerator.Now())
|
||||
|
||||
if err := chk.Valid(fluxlang.DefaultService); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := chk.Status.Valid(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := s.putCheck(ctx, tx, chk.Check); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return chk.Check, nil
|
||||
}
|
||||
|
||||
func (s *Service) patchCheck(ctx context.Context, tx kv.Tx, check influxdb.Check, upd influxdb.CheckUpdate) (influxdb.Check, error) {
|
||||
if upd.Name != nil {
|
||||
check.SetName(*upd.Name)
|
||||
}
|
||||
|
||||
if upd.Description != nil {
|
||||
check.SetDescription(*upd.Description)
|
||||
}
|
||||
|
||||
check.SetUpdatedAt(s.timeGenerator.Now())
|
||||
|
||||
if err := check.Valid(fluxlang.DefaultService); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := s.putCheck(ctx, tx, check, kv.PutUpdate()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return check, nil
|
||||
}
|
||||
|
||||
// DeleteCheck deletes a check and prunes it from the index.
|
||||
func (s *Service) DeleteCheck(ctx context.Context, id influxdb.ID) error {
|
||||
ch, err := s.FindCheckByID(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.tasks.DeleteTask(ctx, ch.GetTaskID()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.kv.Update(ctx, func(tx kv.Tx) error {
|
||||
return s.checkStore.DeleteEnt(ctx, tx, kv.Entity{
|
||||
PK: kv.EncID(id),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func strPtr(s string) *string {
|
||||
ss := new(string)
|
||||
*ss = s
|
||||
return ss
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,84 @@
|
|||
package checks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/inmem"
|
||||
"github.com/influxdata/influxdb/v2/kv"
|
||||
_ "github.com/influxdata/influxdb/v2/query/builtin"
|
||||
"github.com/influxdata/influxdb/v2/query/fluxlang"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func NewKVTestStore(t *testing.T) (kv.Store, func()) {
|
||||
return inmem.NewKVStore(), func() {}
|
||||
}
|
||||
|
||||
func TestCheckService(t *testing.T) {
|
||||
CheckService(initCheckService, t)
|
||||
}
|
||||
|
||||
func initCheckService(f CheckFields, t *testing.T) (influxdb.CheckService, influxdb.TaskService, string, func()) {
|
||||
store, closeKVStore := NewKVTestStore(t)
|
||||
logger := zaptest.NewLogger(t)
|
||||
svc := kv.NewService(logger, store, kv.ServiceConfig{
|
||||
FluxLanguageService: fluxlang.DefaultService,
|
||||
})
|
||||
svc.IDGenerator = f.IDGenerator
|
||||
svc.TimeGenerator = f.TimeGenerator
|
||||
if f.TimeGenerator == nil {
|
||||
svc.TimeGenerator = influxdb.RealTimeGenerator{}
|
||||
}
|
||||
|
||||
checkService := NewService(logger, store, svc, svc)
|
||||
checkService.idGenerator = f.IDGenerator
|
||||
if f.TimeGenerator != nil {
|
||||
checkService.timeGenerator = f.TimeGenerator
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
if err := svc.Initialize(ctx); err != nil {
|
||||
t.Fatalf("error initializing check service: %v", err)
|
||||
}
|
||||
for _, m := range f.UserResourceMappings {
|
||||
if err := svc.CreateUserResourceMapping(ctx, m); err != nil {
|
||||
t.Fatalf("failed to populate user resource mapping: %v", err)
|
||||
}
|
||||
}
|
||||
for _, o := range f.Organizations {
|
||||
if err := svc.PutOrganization(ctx, o); err != nil {
|
||||
t.Fatalf("failed to populate organizations")
|
||||
}
|
||||
}
|
||||
for _, c := range f.Checks {
|
||||
if err := checkService.PutCheck(ctx, c); err != nil {
|
||||
t.Fatalf("failed to populate checks")
|
||||
}
|
||||
}
|
||||
for _, tc := range f.Tasks {
|
||||
if _, err := svc.CreateTask(ctx, tc); err != nil {
|
||||
t.Fatalf("failed to populate tasks: %v", err)
|
||||
}
|
||||
}
|
||||
return checkService, svc, kv.OpPrefix, func() {
|
||||
for _, o := range f.Organizations {
|
||||
if err := svc.DeleteOrganization(ctx, o.ID); err != nil {
|
||||
t.Logf("failed to remove organization: %v", err)
|
||||
}
|
||||
}
|
||||
for _, urm := range f.UserResourceMappings {
|
||||
if err := svc.DeleteUserResourceMapping(ctx, urm.ResourceID, urm.UserID); err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
|
||||
t.Logf("failed to remove urm rule: %v", err)
|
||||
}
|
||||
}
|
||||
for _, c := range f.Checks {
|
||||
if err := checkService.DeleteCheck(ctx, c.GetID()); err != nil {
|
||||
t.Logf("failed to remove check: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
closeKVStore()
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/authorization"
|
||||
"github.com/influxdata/influxdb/v2/authorizer"
|
||||
"github.com/influxdata/influxdb/v2/bolt"
|
||||
"github.com/influxdata/influxdb/v2/checks"
|
||||
"github.com/influxdata/influxdb/v2/chronograf/server"
|
||||
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect"
|
||||
"github.com/influxdata/influxdb/v2/dbrp"
|
||||
|
@ -815,7 +816,8 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
var checkSvc platform.CheckService
|
||||
{
|
||||
coordinator := coordinator.NewCoordinator(m.log, m.scheduler, m.executor)
|
||||
checkSvc = middleware.NewCheckService(m.kvService, m.kvService, coordinator)
|
||||
checkSvc = checks.NewService(m.log.With(zap.String("svc", "checks")), m.kvStore, m.kvService, m.kvService)
|
||||
checkSvc = middleware.NewCheckService(checkSvc, m.kvService, coordinator)
|
||||
}
|
||||
|
||||
var notificationRuleSvc platform.NotificationRuleStore
|
||||
|
|
Loading…
Reference in New Issue