feat(kv): add check kv

pull/14521/head
Jade McGough 2019-07-19 10:43:29 -07:00 committed by Kelvin Wang
parent ec9ecf23a1
commit 0e5091ed21
5 changed files with 2086 additions and 0 deletions

704
kv/check.go Normal file
View File

@ -0,0 +1,704 @@
package kv
import (
"context"
"encoding/json"
"fmt"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/notification/check"
)
var (
checkBucket = []byte("checksv1")
checkIndex = []byte("checkindexv1")
)
var _ influxdb.CheckService = (*Service)(nil)
func (s *Service) initializeChecks(ctx context.Context, tx Tx) error {
if _, err := s.checksBucket(tx); err != nil {
return err
}
if _, err := s.checksIndexBucket(tx); err != nil {
return err
}
return nil
}
// UnexpectedCheckError is used when the error comes from an internal system.
func UnexpectedCheckError(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: fmt.Sprintf("unexpected error retrieving check's bucket; Err %v", err),
Op: "kv/checkBucket",
}
}
// UnexpectedCheckIndexError is used when the error comes from an internal system.
func UnexpectedCheckIndexError(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: fmt.Sprintf("unexpected error retrieving check's index bucket; Err %v", err),
Op: "kv/checkIndex",
}
}
func (s *Service) checksBucket(tx Tx) (Bucket, error) {
b, err := tx.Bucket(checkBucket)
if err != nil {
return nil, UnexpectedCheckError(err)
}
return b, nil
}
func (s *Service) checksIndexBucket(tx Tx) (Bucket, error) {
b, err := tx.Bucket(checkIndex)
if err != nil {
return nil, UnexpectedCheckIndexError(err)
}
return b, nil
}
// FindCheckByID retrieves a check by id.
func (s *Service) FindCheckByID(ctx context.Context, id influxdb.ID) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
var c influxdb.Check
var err error
err = s.kv.View(ctx, func(tx Tx) error {
chk, pe := s.findCheckByID(ctx, tx, id)
if pe != nil {
err = pe
return err
}
c = chk
return nil
})
if err != nil {
return nil, err
}
return c, nil
}
func (s *Service) findCheckByID(ctx context.Context, tx Tx, id influxdb.ID) (influxdb.Check, error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
encodedID, err := id.Encode()
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
bkt, err := s.checksBucket(tx)
if err != nil {
return nil, err
}
v, err := bkt.Get(encodedID)
if IsNotFound(err) {
return nil, &influxdb.Error{
Code: influxdb.ENotFound,
Msg: "check not found",
}
}
if err != nil {
return nil, err
}
c, err := check.UnmarshalJSON(v)
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
return c, nil
}
// FindCheck retrives a check using an arbitrary check filter.
// Filters using ID, or OrganizationID and check Name should be efficient.
// Other filters will do a linear scan across checks until it finds a match.
func (s *Service) FindCheck(ctx context.Context, filter influxdb.CheckFilter) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
var c influxdb.Check
var err error
if filter.ID != nil {
c, err = s.FindCheckByID(ctx, *filter.ID)
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
return c, nil
}
err = s.kv.View(ctx, func(tx Tx) error {
if filter.Org != nil {
o, err := s.findOrganizationByName(ctx, tx, *filter.Org)
if err != nil {
return err
}
filter.OrgID = &o.ID
}
filterFn := filterChecksFn(filter)
return s.forEachCheck(ctx, tx, false, func(chk influxdb.Check) bool {
if filterFn(chk) {
c = chk
return false
}
return true
})
})
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
if c == nil {
return nil, &influxdb.Error{
Code: influxdb.ENotFound,
Msg: "check not found",
}
}
return c, nil
}
func filterChecksFn(filter influxdb.CheckFilter) func(c influxdb.Check) bool {
return func(c influxdb.Check) bool {
if filter.ID != nil {
if c.GetID() != *filter.ID {
return false
}
}
if filter.Name != nil {
if c.GetName() != *filter.Name {
return false
}
}
if filter.OrgID != nil {
if c.GetOrgID() != *filter.OrgID {
return false
}
}
return true
}
}
// FindChecks retrives all checks that match an arbitrary check filter.
// Filters using ID, or OrganizationID and check Name should be efficient.
// Other filters will do a linear scan across all checks searching for a match.
func (s *Service) FindChecks(ctx context.Context, filter influxdb.CheckFilter, opts ...influxdb.FindOptions) ([]influxdb.Check, int, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if filter.ID != nil {
c, err := s.FindCheckByID(ctx, *filter.ID)
if err != nil {
return nil, 0, err
}
return []influxdb.Check{c}, 1, nil
}
cs := []influxdb.Check{}
err := s.kv.View(ctx, func(tx Tx) error {
chks, err := s.findChecks(ctx, tx, filter, opts...)
if err != nil {
return err
}
cs = chks
return nil
})
if err != nil {
return nil, 0, err
}
return cs, len(cs), nil
}
func (s *Service) findChecks(ctx context.Context, tx Tx, filter influxdb.CheckFilter, opts ...influxdb.FindOptions) ([]influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
cs := []influxdb.Check{}
if filter.Org != nil {
o, err := s.findOrganizationByName(ctx, tx, *filter.Org)
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
filter.OrgID = &o.ID
}
var offset, limit, count int
var descending bool
if len(opts) > 0 {
offset = opts[0].Offset
limit = opts[0].Limit
descending = opts[0].Descending
}
filterFn := filterChecksFn(filter)
err := s.forEachCheck(ctx, tx, descending, func(c influxdb.Check) bool {
if filterFn(c) {
if count >= offset {
cs = append(cs, c)
}
count++
}
if limit > 0 && len(cs) >= limit {
return false
}
return true
})
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
return cs, nil
}
// CreateCheck creates a influxdb check and sets ID.
func (s *Service) CreateCheck(ctx context.Context, c influxdb.Check) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
return s.kv.Update(ctx, func(tx Tx) error {
return s.createCheck(ctx, tx, c)
})
}
func (s *Service) createCheck(ctx context.Context, tx Tx, c influxdb.Check) error {
if c.GetOrgID().Valid() {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
_, pe := s.findOrganizationByID(ctx, tx, c.GetOrgID())
if pe != nil {
return &influxdb.Error{
Op: influxdb.OpCreateCheck,
Err: pe,
}
}
}
// check name unique
if _, err := s.findCheckByName(ctx, tx, c.GetOrgID(), c.GetName()); err == nil {
if err == nil {
return &influxdb.Error{
Code: influxdb.EConflict,
Msg: fmt.Sprintf("check with name %s already exists", c.GetName()),
}
}
}
c.SetID(s.IDGenerator.ID())
c.SetCreatedAt(s.Now())
c.SetUpdatedAt(s.Now())
if err := s.putCheck(ctx, tx, c); err != nil {
return err
}
if err := s.createCheckUserResourceMappings(ctx, tx, c); err != nil {
return err
}
return nil
}
// PutCheck will put a check without setting an ID.
func (s *Service) PutCheck(ctx context.Context, c influxdb.Check) error {
return s.kv.Update(ctx, func(tx Tx) error {
var err error
pe := s.putCheck(ctx, tx, c)
if pe != nil {
err = pe
}
return err
})
}
func (s *Service) createCheckUserResourceMappings(ctx context.Context, tx Tx, c influxdb.Check) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
ms, err := s.findUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{
ResourceType: influxdb.OrgsResourceType,
ResourceID: c.GetOrgID(),
})
if err != nil {
return &influxdb.Error{
Err: err,
}
}
for _, m := range ms {
if err := s.createUserResourceMapping(ctx, tx, &influxdb.UserResourceMapping{
ResourceType: influxdb.ChecksResourceType,
ResourceID: c.GetID(),
UserID: m.UserID,
UserType: m.UserType,
}); err != nil {
return &influxdb.Error{
Err: err,
}
}
}
return nil
}
func (s *Service) putCheck(ctx context.Context, tx Tx, c influxdb.Check) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if err := c.Valid(); err != nil {
return err
}
v, err := json.Marshal(c)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
encodedID, err := c.GetID().Encode()
if err != nil {
return &influxdb.Error{
Err: err,
}
}
key, pe := checkIndexKey(c.GetOrgID(), c.GetName())
if err != nil {
return pe
}
idx, err := s.checksIndexBucket(tx)
if err != nil {
return err
}
if err := idx.Put(key, encodedID); err != nil {
return &influxdb.Error{
Err: err,
}
}
bkt, err := s.checksBucket(tx)
if bkt.Put(encodedID, v); err != nil {
return &influxdb.Error{
Err: err,
}
}
return nil
}
// checkIndexKey is a combination of the orgID and the check name.
func checkIndexKey(orgID influxdb.ID, name string) ([]byte, error) {
orgIDEncoded, err := orgID.Encode()
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
k := make([]byte, influxdb.IDLength+len(name))
copy(k, orgIDEncoded)
copy(k[influxdb.IDLength:], []byte(name))
return k, nil
}
// forEachCheck will iterate through all checks while fn returns true.
func (s *Service) forEachCheck(ctx context.Context, tx Tx, descending bool, fn func(influxdb.Check) bool) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
bkt, err := s.checksBucket(tx)
if err != nil {
return err
}
cur, err := bkt.Cursor()
if err != nil {
return err
}
var k, v []byte
if descending {
k, v = cur.Last()
} else {
k, v = cur.First()
}
for k != nil {
c, err := check.UnmarshalJSON(v)
if err != nil {
return err
}
if !fn(c) {
break
}
if descending {
k, v = cur.Prev()
} else {
k, v = cur.Next()
}
}
return nil
}
// PatchCheck updates a check according the parameters set on upd.
func (s *Service) PatchCheck(ctx context.Context, id influxdb.ID, upd influxdb.CheckUpdate) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
var c influxdb.Check
err := s.kv.Update(ctx, func(tx Tx) error {
chk, err := s.patchCheck(ctx, tx, id, upd)
if err != nil {
return err
}
c = chk
return nil
})
return c, err
}
// UpdateCheck updates the check.
func (s *Service) UpdateCheck(ctx context.Context, id influxdb.ID, chk influxdb.Check) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
var c influxdb.Check
err := s.kv.Update(ctx, func(tx Tx) error {
chk, err := s.updateCheck(ctx, tx, id, chk)
if err != nil {
return err
}
c = chk
return nil
})
return c, err
}
func (s *Service) updateCheck(ctx context.Context, tx Tx, id influxdb.ID, chk influxdb.Check) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
current, err := s.findCheckByID(ctx, tx, id)
if err != nil {
return nil, err
}
if chk.GetName() != current.GetName() {
c0, err := s.findCheckByName(ctx, tx, current.GetOrgID(), chk.GetName())
if err == nil && c0.GetID() != id {
return nil, &influxdb.Error{
Code: influxdb.EConflict,
Msg: "check name is not unique",
}
}
key, err := checkIndexKey(current.GetOrgID(), current.GetName())
if err != nil {
return nil, err
}
idx, err := s.checksIndexBucket(tx)
if err != nil {
return nil, err
}
if err := idx.Delete(key); err != nil {
return nil, err
}
}
// ID and OrganizationID can not be updated
chk.SetID(current.GetID())
chk.SetOrgID(current.GetOrgID())
chk.SetCreatedAt(current.GetCRUDLog().CreatedAt)
chk.SetUpdatedAt(s.Now())
if err := s.putCheck(ctx, tx, chk); err != nil {
return nil, err
}
return chk, nil
}
func (s *Service) patchCheck(ctx context.Context, tx Tx, id influxdb.ID, upd influxdb.CheckUpdate) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
c, err := s.findCheckByID(ctx, tx, id)
if err != nil {
return nil, err
}
if upd.Name != nil {
c0, err := s.findCheckByName(ctx, tx, c.GetOrgID(), *upd.Name)
if err == nil && c0.GetID() != id {
return nil, &influxdb.Error{
Code: influxdb.EConflict,
Msg: "check name is not unique",
}
}
key, err := checkIndexKey(c.GetOrgID(), c.GetName())
if err != nil {
return nil, err
}
idx, err := s.checksIndexBucket(tx)
if err != nil {
return nil, err
}
if err := idx.Delete(key); err != nil {
return nil, err
}
c.SetName(*upd.Name)
}
if upd.Description != nil {
c.SetDescription(*upd.Description)
}
if upd.Status != nil {
c.SetStatus(*upd.Status)
}
c.SetUpdatedAt(s.Now())
if err := s.putCheck(ctx, tx, c); err != nil {
return nil, err
}
return c, nil
}
func (s *Service) findCheckByName(ctx context.Context, tx Tx, orgID influxdb.ID, n string) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
key, err := checkIndexKey(orgID, n)
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
idx, err := s.checksIndexBucket(tx)
if err != nil {
return nil, err
}
buf, err := idx.Get(key)
if IsNotFound(err) {
return nil, &influxdb.Error{
Code: influxdb.ENotFound,
Msg: fmt.Sprintf("check %q not found", n),
}
}
if err != nil {
return nil, err
}
var id influxdb.ID
if err := id.Decode(buf); err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
return s.findCheckByID(ctx, tx, id)
}
// DeleteCheck deletes a check and prunes it from the index.
func (s *Service) DeleteCheck(ctx context.Context, id influxdb.ID) error {
return s.kv.Update(ctx, func(tx Tx) error {
var err error
if pe := s.deleteCheck(ctx, tx, id); pe != nil {
err = pe
}
return err
})
}
func (s *Service) deleteCheck(ctx context.Context, tx Tx, id influxdb.ID) error {
c, pe := s.findCheckByID(ctx, tx, id)
if pe != nil {
return pe
}
key, pe := checkIndexKey(c.GetOrgID(), c.GetName())
if pe != nil {
return pe
}
idx, err := s.checksIndexBucket(tx)
if err != nil {
return err
}
if err := idx.Delete(key); err != nil {
return &influxdb.Error{
Err: err,
}
}
encodedID, err := id.Encode()
if err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
bkt, err := s.checksBucket(tx)
if err != nil {
return err
}
if err := bkt.Delete(encodedID); err != nil {
return &influxdb.Error{
Err: err,
}
}
if err := s.deleteUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{
ResourceID: id,
ResourceType: influxdb.ChecksResourceType,
}); err != nil {
return err
}
return nil
}

90
kv/check_test.go Normal file
View File

@ -0,0 +1,90 @@
package kv_test
import (
"context"
"testing"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kv"
influxdbtesting "github.com/influxdata/influxdb/testing"
)
func TestBoltCheckService(t *testing.T) {
influxdbtesting.CheckService(initBoltCheckService, t)
}
func TestInmemCheckService(t *testing.T) {
influxdbtesting.CheckService(initInmemCheckService, t)
}
func initBoltCheckService(f influxdbtesting.CheckFields, t *testing.T) (influxdb.CheckService, string, func()) {
s, closeBolt, err := NewTestBoltStore()
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, op, closeSvc := initCheckService(s, f, t)
return svc, op, func() {
closeSvc()
closeBolt()
}
}
func initInmemCheckService(f influxdbtesting.CheckFields, t *testing.T) (influxdb.CheckService, string, func()) {
s, closeBolt, err := NewTestInmemStore()
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, op, closeSvc := initCheckService(s, f, t)
return svc, op, func() {
closeSvc()
closeBolt()
}
}
func initCheckService(s kv.Store, f influxdbtesting.CheckFields, t *testing.T) (influxdb.CheckService, string, func()) {
svc := kv.NewService(s)
svc.IDGenerator = f.IDGenerator
svc.TimeGenerator = f.TimeGenerator
if f.TimeGenerator == nil {
svc.TimeGenerator = influxdb.RealTimeGenerator{}
}
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing check service: %v", err)
}
for _, m := range f.UserResourceMappings {
if err := svc.CreateUserResourceMapping(ctx, m); err != nil {
t.Fatalf("failed to populate user resource mapping: %v", err)
}
}
for _, o := range f.Organizations {
if err := svc.PutOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate organizations")
}
}
for _, c := range f.Checks {
if err := svc.PutCheck(ctx, c); err != nil {
t.Fatalf("failed to populate checks")
}
}
return svc, kv.OpPrefix, func() {
for _, o := range f.Organizations {
if err := svc.DeleteOrganization(ctx, o.ID); err != nil {
t.Logf("failed to remove organization: %v", err)
}
}
for _, urm := range f.UserResourceMappings {
if err := svc.DeleteUserResourceMapping(ctx, urm.ResourceID, urm.UserID); err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
t.Logf("failed to remove urm rule: %v", err)
}
}
for _, c := range f.Checks {
if err := svc.DeleteCheck(ctx, c.GetID()); err != nil {
t.Logf("failed to remove check: %v", err)
}
}
}
}

View File

@ -126,6 +126,10 @@ func (s *Service) Initialize(ctx context.Context) error {
return err
}
if err := s.initializeChecks(ctx, tx); err != nil {
return err
}
if err := s.initializeNotificationRule(ctx, tx); err != nil {
return err
}

1281
testing/checks.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -38,6 +38,13 @@ func ErrorsEqual(t *testing.T, actual, expected error) {
}
}
// FloatPtr takes the ref of a float number.
func FloatPtr(f float64) *float64 {
p := new(float64)
*p = f
return p
}
func idPtr(id platform.ID) *platform.ID {
return &id
}