influxdb/kv/check.go

705 lines
14 KiB
Go

package kv
import (
"context"
"encoding/json"
"fmt"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/notification/check"
)
var (
checkBucket = []byte("checksv1")
checkIndex = []byte("checkindexv1")
)
var _ influxdb.CheckService = (*Service)(nil)
func (s *Service) initializeChecks(ctx context.Context, tx Tx) error {
if _, err := s.checksBucket(tx); err != nil {
return err
}
if _, err := s.checksIndexBucket(tx); err != nil {
return err
}
return nil
}
// UnexpectedCheckError is used when the error comes from an internal system.
func UnexpectedCheckError(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: fmt.Sprintf("unexpected error retrieving check's bucket; Err %v", err),
Op: "kv/checkBucket",
}
}
// UnexpectedCheckIndexError is used when the error comes from an internal system.
func UnexpectedCheckIndexError(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: fmt.Sprintf("unexpected error retrieving check's index bucket; Err %v", err),
Op: "kv/checkIndex",
}
}
func (s *Service) checksBucket(tx Tx) (Bucket, error) {
b, err := tx.Bucket(checkBucket)
if err != nil {
return nil, UnexpectedCheckError(err)
}
return b, nil
}
func (s *Service) checksIndexBucket(tx Tx) (Bucket, error) {
b, err := tx.Bucket(checkIndex)
if err != nil {
return nil, UnexpectedCheckIndexError(err)
}
return b, nil
}
// FindCheckByID retrieves a check by id.
func (s *Service) FindCheckByID(ctx context.Context, id influxdb.ID) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
var c influxdb.Check
var err error
err = s.kv.View(ctx, func(tx Tx) error {
chk, pe := s.findCheckByID(ctx, tx, id)
if pe != nil {
err = pe
return err
}
c = chk
return nil
})
if err != nil {
return nil, err
}
return c, nil
}
func (s *Service) findCheckByID(ctx context.Context, tx Tx, id influxdb.ID) (influxdb.Check, error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
encodedID, err := id.Encode()
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
bkt, err := s.checksBucket(tx)
if err != nil {
return nil, err
}
v, err := bkt.Get(encodedID)
if IsNotFound(err) {
return nil, &influxdb.Error{
Code: influxdb.ENotFound,
Msg: "check not found",
}
}
if err != nil {
return nil, err
}
c, err := check.UnmarshalJSON(v)
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
return c, nil
}
// FindCheck retrives a check using an arbitrary check filter.
// Filters using ID, or OrganizationID and check Name should be efficient.
// Other filters will do a linear scan across checks until it finds a match.
func (s *Service) FindCheck(ctx context.Context, filter influxdb.CheckFilter) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
var c influxdb.Check
var err error
if filter.ID != nil {
c, err = s.FindCheckByID(ctx, *filter.ID)
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
return c, nil
}
err = s.kv.View(ctx, func(tx Tx) error {
if filter.Org != nil {
o, err := s.findOrganizationByName(ctx, tx, *filter.Org)
if err != nil {
return err
}
filter.OrgID = &o.ID
}
filterFn := filterChecksFn(filter)
return s.forEachCheck(ctx, tx, false, func(chk influxdb.Check) bool {
if filterFn(chk) {
c = chk
return false
}
return true
})
})
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
if c == nil {
return nil, &influxdb.Error{
Code: influxdb.ENotFound,
Msg: "check not found",
}
}
return c, nil
}
func filterChecksFn(filter influxdb.CheckFilter) func(c influxdb.Check) bool {
return func(c influxdb.Check) bool {
if filter.ID != nil {
if c.GetID() != *filter.ID {
return false
}
}
if filter.Name != nil {
if c.GetName() != *filter.Name {
return false
}
}
if filter.OrgID != nil {
if c.GetOrgID() != *filter.OrgID {
return false
}
}
return true
}
}
// FindChecks retrives all checks that match an arbitrary check filter.
// Filters using ID, or OrganizationID and check Name should be efficient.
// Other filters will do a linear scan across all checks searching for a match.
func (s *Service) FindChecks(ctx context.Context, filter influxdb.CheckFilter, opts ...influxdb.FindOptions) ([]influxdb.Check, int, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if filter.ID != nil {
c, err := s.FindCheckByID(ctx, *filter.ID)
if err != nil {
return nil, 0, err
}
return []influxdb.Check{c}, 1, nil
}
cs := []influxdb.Check{}
err := s.kv.View(ctx, func(tx Tx) error {
chks, err := s.findChecks(ctx, tx, filter, opts...)
if err != nil {
return err
}
cs = chks
return nil
})
if err != nil {
return nil, 0, err
}
return cs, len(cs), nil
}
func (s *Service) findChecks(ctx context.Context, tx Tx, filter influxdb.CheckFilter, opts ...influxdb.FindOptions) ([]influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
cs := []influxdb.Check{}
if filter.Org != nil {
o, err := s.findOrganizationByName(ctx, tx, *filter.Org)
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
filter.OrgID = &o.ID
}
var offset, limit, count int
var descending bool
if len(opts) > 0 {
offset = opts[0].Offset
limit = opts[0].Limit
descending = opts[0].Descending
}
filterFn := filterChecksFn(filter)
err := s.forEachCheck(ctx, tx, descending, func(c influxdb.Check) bool {
if filterFn(c) {
if count >= offset {
cs = append(cs, c)
}
count++
}
if limit > 0 && len(cs) >= limit {
return false
}
return true
})
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
return cs, nil
}
// CreateCheck creates a influxdb check and sets ID.
func (s *Service) CreateCheck(ctx context.Context, c influxdb.Check) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
return s.kv.Update(ctx, func(tx Tx) error {
return s.createCheck(ctx, tx, c)
})
}
func (s *Service) createCheck(ctx context.Context, tx Tx, c influxdb.Check) error {
if c.GetOrgID().Valid() {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
_, pe := s.findOrganizationByID(ctx, tx, c.GetOrgID())
if pe != nil {
return &influxdb.Error{
Op: influxdb.OpCreateCheck,
Err: pe,
}
}
}
// check name unique
if _, err := s.findCheckByName(ctx, tx, c.GetOrgID(), c.GetName()); err == nil {
if err == nil {
return &influxdb.Error{
Code: influxdb.EConflict,
Msg: fmt.Sprintf("check with name %s already exists", c.GetName()),
}
}
}
c.SetID(s.IDGenerator.ID())
c.SetCreatedAt(s.Now())
c.SetUpdatedAt(s.Now())
if err := s.putCheck(ctx, tx, c); err != nil {
return err
}
if err := s.createCheckUserResourceMappings(ctx, tx, c); err != nil {
return err
}
return nil
}
// PutCheck will put a check without setting an ID.
func (s *Service) PutCheck(ctx context.Context, c influxdb.Check) error {
return s.kv.Update(ctx, func(tx Tx) error {
var err error
pe := s.putCheck(ctx, tx, c)
if pe != nil {
err = pe
}
return err
})
}
func (s *Service) createCheckUserResourceMappings(ctx context.Context, tx Tx, c influxdb.Check) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
ms, err := s.findUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{
ResourceType: influxdb.OrgsResourceType,
ResourceID: c.GetOrgID(),
})
if err != nil {
return &influxdb.Error{
Err: err,
}
}
for _, m := range ms {
if err := s.createUserResourceMapping(ctx, tx, &influxdb.UserResourceMapping{
ResourceType: influxdb.ChecksResourceType,
ResourceID: c.GetID(),
UserID: m.UserID,
UserType: m.UserType,
}); err != nil {
return &influxdb.Error{
Err: err,
}
}
}
return nil
}
func (s *Service) putCheck(ctx context.Context, tx Tx, c influxdb.Check) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if err := c.Valid(); err != nil {
return err
}
v, err := json.Marshal(c)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
encodedID, err := c.GetID().Encode()
if err != nil {
return &influxdb.Error{
Err: err,
}
}
key, pe := checkIndexKey(c.GetOrgID(), c.GetName())
if err != nil {
return pe
}
idx, err := s.checksIndexBucket(tx)
if err != nil {
return err
}
if err := idx.Put(key, encodedID); err != nil {
return &influxdb.Error{
Err: err,
}
}
bkt, err := s.checksBucket(tx)
if bkt.Put(encodedID, v); err != nil {
return &influxdb.Error{
Err: err,
}
}
return nil
}
// checkIndexKey is a combination of the orgID and the check name.
func checkIndexKey(orgID influxdb.ID, name string) ([]byte, error) {
orgIDEncoded, err := orgID.Encode()
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
k := make([]byte, influxdb.IDLength+len(name))
copy(k, orgIDEncoded)
copy(k[influxdb.IDLength:], []byte(name))
return k, nil
}
// forEachCheck will iterate through all checks while fn returns true.
func (s *Service) forEachCheck(ctx context.Context, tx Tx, descending bool, fn func(influxdb.Check) bool) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
bkt, err := s.checksBucket(tx)
if err != nil {
return err
}
cur, err := bkt.Cursor()
if err != nil {
return err
}
var k, v []byte
if descending {
k, v = cur.Last()
} else {
k, v = cur.First()
}
for k != nil {
c, err := check.UnmarshalJSON(v)
if err != nil {
return err
}
if !fn(c) {
break
}
if descending {
k, v = cur.Prev()
} else {
k, v = cur.Next()
}
}
return nil
}
// PatchCheck updates a check according the parameters set on upd.
func (s *Service) PatchCheck(ctx context.Context, id influxdb.ID, upd influxdb.CheckUpdate) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
var c influxdb.Check
err := s.kv.Update(ctx, func(tx Tx) error {
chk, err := s.patchCheck(ctx, tx, id, upd)
if err != nil {
return err
}
c = chk
return nil
})
return c, err
}
// UpdateCheck updates the check.
func (s *Service) UpdateCheck(ctx context.Context, id influxdb.ID, chk influxdb.Check) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
var c influxdb.Check
err := s.kv.Update(ctx, func(tx Tx) error {
chk, err := s.updateCheck(ctx, tx, id, chk)
if err != nil {
return err
}
c = chk
return nil
})
return c, err
}
func (s *Service) updateCheck(ctx context.Context, tx Tx, id influxdb.ID, chk influxdb.Check) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
current, err := s.findCheckByID(ctx, tx, id)
if err != nil {
return nil, err
}
if chk.GetName() != current.GetName() {
c0, err := s.findCheckByName(ctx, tx, current.GetOrgID(), chk.GetName())
if err == nil && c0.GetID() != id {
return nil, &influxdb.Error{
Code: influxdb.EConflict,
Msg: "check name is not unique",
}
}
key, err := checkIndexKey(current.GetOrgID(), current.GetName())
if err != nil {
return nil, err
}
idx, err := s.checksIndexBucket(tx)
if err != nil {
return nil, err
}
if err := idx.Delete(key); err != nil {
return nil, err
}
}
// ID and OrganizationID can not be updated
chk.SetID(current.GetID())
chk.SetOrgID(current.GetOrgID())
chk.SetCreatedAt(current.GetCRUDLog().CreatedAt)
chk.SetUpdatedAt(s.Now())
if err := s.putCheck(ctx, tx, chk); err != nil {
return nil, err
}
return chk, nil
}
func (s *Service) patchCheck(ctx context.Context, tx Tx, id influxdb.ID, upd influxdb.CheckUpdate) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
c, err := s.findCheckByID(ctx, tx, id)
if err != nil {
return nil, err
}
if upd.Name != nil {
c0, err := s.findCheckByName(ctx, tx, c.GetOrgID(), *upd.Name)
if err == nil && c0.GetID() != id {
return nil, &influxdb.Error{
Code: influxdb.EConflict,
Msg: "check name is not unique",
}
}
key, err := checkIndexKey(c.GetOrgID(), c.GetName())
if err != nil {
return nil, err
}
idx, err := s.checksIndexBucket(tx)
if err != nil {
return nil, err
}
if err := idx.Delete(key); err != nil {
return nil, err
}
c.SetName(*upd.Name)
}
if upd.Description != nil {
c.SetDescription(*upd.Description)
}
if upd.Status != nil {
c.SetStatus(*upd.Status)
}
c.SetUpdatedAt(s.Now())
if err := s.putCheck(ctx, tx, c); err != nil {
return nil, err
}
return c, nil
}
func (s *Service) findCheckByName(ctx context.Context, tx Tx, orgID influxdb.ID, n string) (influxdb.Check, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
key, err := checkIndexKey(orgID, n)
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
idx, err := s.checksIndexBucket(tx)
if err != nil {
return nil, err
}
buf, err := idx.Get(key)
if IsNotFound(err) {
return nil, &influxdb.Error{
Code: influxdb.ENotFound,
Msg: fmt.Sprintf("check %q not found", n),
}
}
if err != nil {
return nil, err
}
var id influxdb.ID
if err := id.Decode(buf); err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
return s.findCheckByID(ctx, tx, id)
}
// DeleteCheck deletes a check and prunes it from the index.
func (s *Service) DeleteCheck(ctx context.Context, id influxdb.ID) error {
return s.kv.Update(ctx, func(tx Tx) error {
var err error
if pe := s.deleteCheck(ctx, tx, id); pe != nil {
err = pe
}
return err
})
}
func (s *Service) deleteCheck(ctx context.Context, tx Tx, id influxdb.ID) error {
c, pe := s.findCheckByID(ctx, tx, id)
if pe != nil {
return pe
}
key, pe := checkIndexKey(c.GetOrgID(), c.GetName())
if pe != nil {
return pe
}
idx, err := s.checksIndexBucket(tx)
if err != nil {
return err
}
if err := idx.Delete(key); err != nil {
return &influxdb.Error{
Err: err,
}
}
encodedID, err := id.Encode()
if err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
bkt, err := s.checksBucket(tx)
if err != nil {
return err
}
if err := bkt.Delete(encodedID); err != nil {
return &influxdb.Error{
Err: err,
}
}
if err := s.deleteUserResourceMappings(ctx, tx, influxdb.UserResourceMappingFilter{
ResourceID: id,
ResourceType: influxdb.ChecksResourceType,
}); err != nil {
return err
}
return nil
}