chore(kv): add new/update checks to kv store base and index store

pull/16260/head
Johnny Steenbergen 2020-01-06 15:31:50 -08:00 committed by Johnny Steenbergen
parent fb2080f223
commit ec35815553
9 changed files with 420 additions and 225 deletions

View File

@ -2,7 +2,6 @@ package kv
import (
"context"
"fmt"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kit/tracing"
@ -21,7 +20,7 @@ func newCheckStore() *IndexStore {
var decValToEntFn ConvertValToEntFn = func(_ []byte, v interface{}) (Entity, error) {
ch, ok := v.(influxdb.Check)
if err := errUnexpectedDecodeVal(ok); err != nil {
if err := IsErrUnexpectedDecodeVal(ok); err != nil {
return Entity{}, err
}
return Entity{
@ -126,7 +125,7 @@ func (s *Service) FindCheck(ctx context.Context, filter influxdb.CheckFilter) (i
Limit: 1,
FilterEntFn: func(k []byte, v interface{}) bool {
ch, ok := v.(influxdb.Check)
if err := errUnexpectedDecodeVal(ok); err != nil {
if err := IsErrUnexpectedDecodeVal(ok); err != nil {
return false
}
return filterFn(ch)
@ -221,14 +220,14 @@ func (s *Service) FindChecks(ctx context.Context, filter influxdb.CheckFilter, o
Prefix: prefix,
FilterEntFn: func(k []byte, v interface{}) bool {
ch, ok := v.(influxdb.Check)
if err := errUnexpectedDecodeVal(ok); err != nil {
if err := IsErrUnexpectedDecodeVal(ok); err != nil {
return false
}
return filterFn(ch)
},
CaptureFn: func(key []byte, decodedVal interface{}) error {
c, ok := decodedVal.(influxdb.Check)
if err := errUnexpectedDecodeVal(ok); err != nil {
if err := IsErrUnexpectedDecodeVal(ok); err != nil {
return err
}
checks = append(checks, c)
@ -271,14 +270,6 @@ func (s *Service) CreateCheck(ctx context.Context, c influxdb.CheckCreate, userI
}
func (s *Service) createCheck(ctx context.Context, tx Tx, c influxdb.CheckCreate, userID influxdb.ID) error {
// check name unique
if _, err := s.findCheckByName(ctx, tx, c.GetOrgID(), c.GetName()); err == nil {
return &influxdb.Error{
Code: influxdb.EConflict,
Msg: fmt.Sprintf("check with name %s already exists", c.GetName()),
}
}
c.SetID(s.IDGenerator.ID())
c.SetOwnerID(userID)
now := s.Now()
@ -295,7 +286,7 @@ func (s *Service) createCheck(ctx context.Context, tx Tx, c influxdb.CheckCreate
}
c.SetTaskID(t.ID)
if err := s.putCheck(ctx, tx, c); err != nil {
if err := s.putCheck(ctx, tx, c, PutNew()); err != nil {
return err
}
@ -334,12 +325,12 @@ func (s *Service) PutCheck(ctx context.Context, c influxdb.Check) error {
})
}
func (s *Service) putCheck(ctx context.Context, tx Tx, c influxdb.Check) error {
func (s *Service) putCheck(ctx context.Context, tx Tx, c influxdb.Check, opts ...PutOptionFn) error {
return s.checkStore.Put(ctx, tx, Entity{
PK: EncID(c.GetID()),
UniqueKey: Encode(EncID(c.GetOrgID()), EncString(c.GetName())),
Body: c,
})
}, opts...)
}
// PatchCheck updates a check according the parameters set on upd.
@ -455,20 +446,6 @@ func (s *Service) patchCheck(ctx context.Context, tx Tx, id influxdb.ID, upd inf
}
if upd.Name != nil {
c0, err := s.findCheckByName(ctx, tx, c.GetOrgID(), *upd.Name)
if err == nil && c0.GetID() != id {
return nil, &influxdb.Error{
Code: influxdb.EConflict,
Msg: "check name is not unique",
}
}
ent := Entity{
UniqueKey: Encode(EncID(c.GetOrgID()), EncString(c.GetName())),
}
if err := s.checkStore.IndexStore.DeleteEnt(ctx, tx, ent); err != nil {
return nil, err
}
c.SetName(*upd.Name)
}
@ -489,11 +466,11 @@ func (s *Service) patchCheck(ctx context.Context, tx Tx, id influxdb.ID, upd inf
return nil, err
}
if _, err := s.updateTask(ctx, tx, c.GetTaskID(), tu); err != nil {
if err := s.putCheck(ctx, tx, c, PutUpdate()); err != nil {
return nil, err
}
if err := s.putCheck(ctx, tx, c); err != nil {
if _, err := s.updateTask(ctx, tx, c.GetTaskID(), tu); err != nil {
return nil, err
}

View File

@ -2,7 +2,6 @@ package kv
import (
"context"
"fmt"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kit/tracing"
@ -29,7 +28,7 @@ func newEndpointStore() *IndexStore {
var decValToEntFn ConvertValToEntFn = func(_ []byte, v interface{}) (Entity, error) {
edp, ok := v.(influxdb.NotificationEndpoint)
if err := errUnexpectedDecodeVal(ok); err != nil {
if err := IsErrUnexpectedDecodeVal(ok); err != nil {
return Entity{}, err
}
return Entity{
@ -65,13 +64,7 @@ func (s *Service) createNotificationEndpoint(ctx context.Context, tx Tx, edp inf
return err
}
}
// notification endpoint name unique
if _, err := s.findNotificationEndpointByName(ctx, tx, edp.GetOrgID(), edp.GetName()); err == nil {
return &influxdb.Error{
Code: influxdb.EConflict,
Msg: fmt.Sprintf("notification endpoint with name %s already exists", edp.GetName()),
}
}
id := s.IDGenerator.ID()
edp.SetID(id)
now := s.TimeGenerator.Now()
@ -88,7 +81,7 @@ func (s *Service) createNotificationEndpoint(ctx context.Context, tx Tx, edp inf
UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())),
Body: edp,
}
if err := s.endpointStore.Put(ctx, tx, ent); err != nil {
if err := s.endpointStore.Put(ctx, tx, ent, PutNew()); err != nil {
return err
}
@ -101,21 +94,6 @@ func (s *Service) createNotificationEndpoint(ctx context.Context, tx Tx, edp inf
return s.createUserResourceMapping(ctx, tx, urm)
}
func (s *Service) findNotificationEndpointByName(ctx context.Context, tx Tx, orgID influxdb.ID, name string) (influxdb.NotificationEndpoint, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
body, err := s.endpointStore.FindEnt(ctx, tx, Entity{
UniqueKey: Encode(EncID(orgID), EncString(name)),
})
if err != nil {
return nil, err
}
edp, ok := body.(influxdb.NotificationEndpoint)
return edp, errUnexpectedDecodeVal(ok)
}
// UpdateNotificationEndpoint updates a single notification endpoint.
// Returns the new notification endpoint after update.
func (s *Service) UpdateNotificationEndpoint(ctx context.Context, id influxdb.ID, edp influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) {
@ -133,27 +111,7 @@ func (s *Service) updateNotificationEndpoint(ctx context.Context, tx Tx, id infl
return nil, err
}
if edpName, curName := edp.GetName(), current.GetName(); edpName != curName {
edp0, err := s.findNotificationEndpointByName(ctx, tx, current.GetOrgID(), edpName)
// TODO: when can id every be zero value from store?... feels off
if err == nil && edp0.GetID() != id {
return nil, &influxdb.Error{
Code: influxdb.EConflict,
Msg: "notification endpoint name is not unique",
}
}
err = s.endpointStore.IndexStore.DeleteEnt(ctx, tx, Entity{
UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(curName)),
})
if err != nil {
return nil, err
}
}
// ID and OrganizationID can not be updated
edp.SetID(current.GetID())
edp.SetOrgID(current.GetOrgID())
edp.SetCreatedAt(current.GetCRUDLog().CreatedAt)
edp.SetUpdatedAt(s.TimeGenerator.Now())
@ -166,7 +124,7 @@ func (s *Service) updateNotificationEndpoint(ctx context.Context, tx Tx, id infl
UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())),
Body: edp,
}
if err := s.endpointStore.Put(ctx, tx, ent); err != nil {
if err := s.endpointStore.Put(ctx, tx, ent, PutUpdate()); err != nil {
return nil, err
}
@ -195,22 +153,6 @@ func (s *Service) patchNotificationEndpoint(ctx context.Context, tx Tx, id influ
if err != nil {
return nil, err
}
if upd.Name != nil {
edp0, err := s.findNotificationEndpointByName(ctx, tx, edp.GetOrgID(), *upd.Name)
if err == nil && edp0.GetID() != id {
return nil, &influxdb.Error{
Code: influxdb.EConflict,
Msg: "notification endpoint name is not unique",
}
}
err = s.endpointStore.IndexStore.DeleteEnt(ctx, tx, Entity{
UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())),
})
if err != nil {
return nil, err
}
}
if upd.Name != nil {
edp.SetName(*upd.Name)
@ -234,7 +176,7 @@ func (s *Service) patchNotificationEndpoint(ctx context.Context, tx Tx, id influ
UniqueKey: Encode(EncID(edp.GetOrgID()), EncString(edp.GetName())),
Body: edp,
}
if err := s.endpointStore.Put(ctx, tx, ent); err != nil {
if err := s.endpointStore.Put(ctx, tx, ent, PutUpdate()); err != nil {
return nil, err
}
@ -280,7 +222,7 @@ func (s *Service) findNotificationEndpointByID(ctx context.Context, tx Tx, id in
return nil, err
}
edp, ok := decodedEnt.(influxdb.NotificationEndpoint)
return edp, errUnexpectedDecodeVal(ok)
return edp, IsErrUnexpectedDecodeVal(ok)
}
// FindNotificationEndpoints returns a list of notification endpoints that match isNext and the total count of matching notification endpoints.
@ -331,7 +273,7 @@ func (s *Service) findNotificationEndpoints(ctx context.Context, tx Tx, filter i
FilterEntFn: filterEndpointsFn(idMap, filter),
CaptureFn: func(k []byte, v interface{}) error {
edp, ok := v.(influxdb.NotificationEndpoint)
if err := errUnexpectedDecodeVal(ok); err != nil {
if err := IsErrUnexpectedDecodeVal(ok); err != nil {
return err
}
edps = append(edps, edp)

View File

@ -73,7 +73,7 @@ func DecodeOrgNameKey(k []byte) (influxdb.ID, string, error) {
func NewOrgNameKeyStore(resource string, bktName []byte, caseSensitive bool) *StoreBase {
var decValToEntFn ConvertValToEntFn = func(k []byte, v interface{}) (Entity, error) {
id, ok := v.(influxdb.ID)
if err := errUnexpectedDecodeVal(ok); err != nil {
if err := IsErrUnexpectedDecodeVal(ok); err != nil {
return Entity{}, err
}
@ -275,11 +275,54 @@ func (s *StoreBase) FindEnt(ctx context.Context, tx Tx, ent Entity) (interface{}
return s.decodeEnt(ctx, body)
}
type (
putOption struct {
isNew bool
isUpdate bool
}
// PutOptionFn provides a hint to the store to make some guarantees about the
// put action. I.e. If it is new, then will validate there is no existing entity
// by the given PK.
PutOptionFn func(o *putOption) error
)
// PutNew will create an entity that is not does not already exist. Guarantees uniqueness
// by the store's uniqueness guarantees.
func PutNew() PutOptionFn {
return func(o *putOption) error {
o.isNew = true
return nil
}
}
// PutUpdate will update an entity that must already exist.
func PutUpdate() PutOptionFn {
return func(o *putOption) error {
o.isUpdate = true
return nil
}
}
// Put will persist the entity.
func (s *StoreBase) Put(ctx context.Context, tx Tx, ent Entity) error {
func (s *StoreBase) Put(ctx context.Context, tx Tx, ent Entity, opts ...PutOptionFn) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
var opt putOption
for _, o := range opts {
if err := o(&opt); err != nil {
return &influxdb.Error{
Code: influxdb.EConflict,
Err: err,
}
}
}
if err := s.putValidate(ctx, tx, ent, opt); err != nil {
return err
}
encodedID, err := s.EntKey(ctx, ent)
if err != nil {
return err
@ -293,6 +336,25 @@ func (s *StoreBase) Put(ctx context.Context, tx Tx, ent Entity) error {
return s.bucketPut(ctx, tx, encodedID, body)
}
func (s *StoreBase) putValidate(ctx context.Context, tx Tx, ent Entity, opt putOption) error {
if !opt.isUpdate && !opt.isNew {
return nil
}
_, err := s.FindEnt(ctx, tx, ent)
if opt.isNew {
if err == nil || influxdb.ErrorCode(err) != influxdb.ENotFound {
return &influxdb.Error{
Code: influxdb.EConflict,
Msg: fmt.Sprintf("%s is not unique", s.Resource),
Err: err,
}
}
return nil
}
return err
}
func (s *StoreBase) bucket(ctx context.Context, tx Tx) (Bucket, error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
@ -548,7 +610,7 @@ func (i *iterator) seek(ctx context.Context) {
}
func errUnexpectedDecodeVal(ok bool) error {
func IsErrUnexpectedDecodeVal(ok bool) error {
if ok {
return nil
}

View File

@ -36,16 +36,91 @@ func TestStoreBase(t *testing.T) {
}
t.Run("Put", func(t *testing.T) {
base, done, inmemStore := newFooStoreBase(t, "put")
defer done()
testPutBase(t, inmemStore, base, base.BktName)
t.Run("basic", func(t *testing.T) {
base, done, kvStore := newFooStoreBase(t, "put")
defer done()
testPutBase(t, kvStore, base, base.BktName)
})
t.Run("new", func(t *testing.T) {
base, done, kvStore := newFooStoreBase(t, "put")
defer done()
expected := newFooEnt(3, 33, "name3")
update(t, kvStore, func(tx kv.Tx) error {
return base.Put(context.TODO(), tx, expected, kv.PutNew())
})
var actual interface{}
view(t, kvStore, func(tx kv.Tx) error {
f, err := base.FindEnt(context.TODO(), tx, kv.Entity{PK: expected.PK})
actual = f
return err
})
assert.Equal(t, expected.Body, actual)
})
t.Run("update", func(t *testing.T) {
base, done, kvStore := newFooStoreBase(t, "put")
defer done()
expected := testPutBase(t, kvStore, base, base.BktName)
updateEnt := newFooEnt(expected.ID, expected.OrgID, "new name")
update(t, kvStore, func(tx kv.Tx) error {
return base.Put(context.TODO(), tx, updateEnt, kv.PutUpdate())
})
var actual interface{}
view(t, kvStore, func(tx kv.Tx) error {
f, err := base.FindEnt(context.TODO(), tx, kv.Entity{PK: kv.EncID(expected.ID)})
actual = f
return err
})
expected.Name = "new name"
assert.Equal(t, expected, actual)
})
t.Run("error cases", func(t *testing.T) {
t.Run("new entity conflicts with existing", func(t *testing.T) {
base, done, kvStore := newFooStoreBase(t, "put")
defer done()
expected := testPutBase(t, kvStore, base, base.BktName)
err := kvStore.Update(context.TODO(), func(tx kv.Tx) error {
entCopy := newFooEnt(expected.ID, expected.OrgID, expected.Name)
return base.Put(context.TODO(), tx, entCopy, kv.PutNew())
})
require.Error(t, err)
assert.Equal(t, influxdb.EConflict, influxdb.ErrorCode(err))
})
})
t.Run("updating entity that does not exist", func(t *testing.T) {
base, done, kvStore := newFooStoreBase(t, "put")
defer done()
expected := testPutBase(t, kvStore, base, base.BktName)
err := kvStore.Update(context.TODO(), func(tx kv.Tx) error {
// ent by id does not exist
entCopy := newFooEnt(333, expected.OrgID, "name1")
return base.Put(context.TODO(), tx, entCopy, kv.PutUpdate())
})
require.Error(t, err)
assert.Equal(t, influxdb.ENotFound, influxdb.ErrorCode(err))
})
})
t.Run("DeleteEnt", func(t *testing.T) {
base, done, inmemStore := newFooStoreBase(t, "delete_ent")
base, done, kvStore := newFooStoreBase(t, "delete_ent")
defer done()
testDeleteEntBase(t, inmemStore, base)
testDeleteEntBase(t, kvStore, base)
})
t.Run("Delete", func(t *testing.T) {
@ -55,10 +130,10 @@ func TestStoreBase(t *testing.T) {
})
t.Run("FindEnt", func(t *testing.T) {
base, done, inmemStore := newFooStoreBase(t, "find_ent")
base, done, kvStore := newFooStoreBase(t, "find_ent")
defer done()
testFindEnt(t, inmemStore, base)
testFindEnt(t, kvStore, base)
})
t.Run("Find", func(t *testing.T) {
@ -150,17 +225,17 @@ func testDeleteBase(t *testing.T, fn func(t *testing.T, suffix string) (storeBas
fn := func(t *testing.T) {
t.Helper()
base, done, inmemStore := fn(t, "delete")
base, done, kvStore := fn(t, "delete")
defer done()
seedEnts(t, inmemStore, base, expectedEnts...)
seedEnts(t, kvStore, base, expectedEnts...)
update(t, inmemStore, func(tx kv.Tx) error {
update(t, kvStore, func(tx kv.Tx) error {
return base.Delete(context.TODO(), tx, tt.opts)
})
var actuals []interface{}
view(t, inmemStore, func(tx kv.Tx) error {
view(t, kvStore, func(tx kv.Tx) error {
return base.Find(context.TODO(), tx, kv.FindOpts{
CaptureFn: func(key []byte, decodedVal interface{}) error {
actuals = append(actuals, decodedVal)
@ -179,7 +254,7 @@ func testDeleteBase(t *testing.T, fn func(t *testing.T, suffix string) (storeBas
}
for _, assertFn := range assertFns {
assertFn(t, inmemStore, base, entsLeft)
assertFn(t, kvStore, base, entsLeft)
}
}
t.Run(tt.name, fn)
@ -297,7 +372,7 @@ type storeBase interface {
DeleteEnt(ctx context.Context, tx kv.Tx, ent kv.Entity) error
FindEnt(ctx context.Context, tx kv.Tx, ent kv.Entity) (interface{}, error)
Find(ctx context.Context, tx kv.Tx, opts kv.FindOpts) error
Put(ctx context.Context, tx kv.Tx, ent kv.Entity) error
Put(ctx context.Context, tx kv.Tx, ent kv.Entity, opts ...kv.PutOptionFn) error
}
func seedEnts(t *testing.T, kvStore kv.Store, store storeBase, ents ...kv.Entity) {

View File

@ -1,7 +1,10 @@
package kv
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kit/tracing"
@ -180,24 +183,12 @@ func (s *IndexStore) FindEnt(ctx context.Context, tx Tx, ent Entity) (interface{
}
}
}
if err != nil {
return s.findByIndex(ctx, tx, ent)
}
return s.EntStore.FindEnt(ctx, tx, ent)
}
// Put will persist the entity into both the entity store and the index store.
func (s *IndexStore) Put(ctx context.Context, tx Tx, ent Entity) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if err := s.IndexStore.Put(ctx, tx, ent); err != nil {
return err
}
return s.EntStore.Put(ctx, tx, ent)
}
func (s *IndexStore) findByIndex(ctx context.Context, tx Tx, ent Entity) (interface{}, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
@ -219,3 +210,110 @@ func (s *IndexStore) findByIndex(ctx context.Context, tx Tx, ent Entity) (interf
return s.EntStore.FindEnt(ctx, tx, indexEnt)
}
// Put will persist the entity into both the entity store and the index store.
func (s *IndexStore) Put(ctx context.Context, tx Tx, ent Entity, opts ...PutOptionFn) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
var opt putOption
for _, o := range opts {
if err := o(&opt); err != nil {
return &influxdb.Error{
Code: influxdb.EConflict,
Err: err,
}
}
}
if err := s.putValidate(ctx, tx, ent, opt); err != nil {
return err
}
if err := s.IndexStore.Put(ctx, tx, ent); err != nil {
return err
}
return s.EntStore.Put(ctx, tx, ent)
}
func (s *IndexStore) putValidate(ctx context.Context, tx Tx, ent Entity, opt putOption) error {
if opt.isNew {
return s.validNew(ctx, tx, ent)
}
if opt.isUpdate {
return s.validUpdate(ctx, tx, ent)
}
return nil
}
func (s *IndexStore) validNew(ctx context.Context, tx Tx, ent Entity) error {
_, err := s.IndexStore.FindEnt(ctx, tx, ent)
if err == nil || influxdb.ErrorCode(err) != influxdb.ENotFound {
key, _ := s.IndexStore.EntKey(ctx, ent)
return &influxdb.Error{
Code: influxdb.EConflict,
Msg: fmt.Sprintf("%s is not unique for key %s", s.Resource, string(key)),
Err: err,
}
}
if _, err := s.EntStore.FindEnt(ctx, tx, ent); err != nil && influxdb.ErrorCode(err) != influxdb.ENotFound {
return &influxdb.Error{Code: influxdb.EInternal, Err: err}
}
return nil
}
func (s *IndexStore) validUpdate(ctx context.Context, tx Tx, ent Entity) error {
idxVal, err := s.IndexStore.FindEnt(ctx, tx, ent)
if err != nil {
if influxdb.ErrorCode(err) == influxdb.ENotFound {
return nil
}
return err
}
idxKey, err := s.IndexStore.EntKey(ctx, ent)
if err != nil {
return err
}
indexEnt, err := s.IndexStore.ConvertValToEntFn(idxKey, idxVal)
if err != nil {
return err
}
if err := sameKeys(ent.PK, indexEnt.PK); err != nil {
if _, err := s.EntStore.FindEnt(ctx, tx, ent); influxdb.ErrorCode(err) == influxdb.ENotFound {
key, _ := ent.PK()
return &influxdb.Error{
Code: influxdb.ENotFound,
Msg: fmt.Sprintf("%s does not exist for key %s", s.Resource, string(key)),
Err: err,
}
}
key, _ := indexEnt.UniqueKey()
return &influxdb.Error{
Code: influxdb.EConflict,
Msg: fmt.Sprintf("%s entity update conflicts with an existing entity for key %s", s.Resource, string(key)),
}
}
return s.IndexStore.DeleteEnt(ctx, tx, ent)
}
func sameKeys(key1, key2 EncodeFn) error {
pk1, err := key1()
if err != nil {
return err
}
pk2, err := key2()
if err != nil {
return err
}
if !bytes.Equal(pk1, pk2) {
return errors.New("keys differ")
}
return nil
}

View File

@ -4,6 +4,8 @@ import (
"context"
"testing"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kv"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -17,7 +19,7 @@ func TestIndexStore(t *testing.T) {
newFooIndexStore := func(t *testing.T, bktSuffix string) (*kv.IndexStore, func(), kv.Store) {
t.Helper()
inmemStore, done, err := NewTestBoltStore(t)
kvStoreStore, done, err := NewTestBoltStore(t)
require.NoError(t, err)
const resource = "foo"
@ -28,31 +30,130 @@ func TestIndexStore(t *testing.T) {
IndexStore: kv.NewOrgNameKeyStore(resource, []byte("foo_idx_"+bktSuffix), false),
}
return indexStore, done, inmemStore
return indexStore, done, kvStoreStore
}
t.Run("Put", func(t *testing.T) {
indexStore, done, inmem := newFooIndexStore(t, "put")
defer done()
t.Run("basic", func(t *testing.T) {
indexStore, done, kvStore := newFooIndexStore(t, "put")
defer done()
expected := testPutBase(t, inmem, indexStore, indexStore.EntStore.BktName)
expected := testPutBase(t, kvStore, indexStore, indexStore.EntStore.BktName)
key, err := indexStore.IndexStore.EntKey(context.TODO(), kv.Entity{
UniqueKey: kv.Encode(kv.EncID(expected.OrgID), kv.EncString(expected.Name)),
key, err := indexStore.IndexStore.EntKey(context.TODO(), kv.Entity{
UniqueKey: kv.Encode(kv.EncID(expected.OrgID), kv.EncString(expected.Name)),
})
require.NoError(t, err)
rawIndex := getEntRaw(t, kvStore, indexStore.IndexStore.BktName, key)
assert.Equal(t, encodeID(t, expected.ID), rawIndex)
})
require.NoError(t, err)
rawIndex := getEntRaw(t, inmem, indexStore.IndexStore.BktName, key)
assert.Equal(t, encodeID(t, expected.ID), rawIndex)
t.Run("new entity", func(t *testing.T) {
indexStore, done, kvStore := newFooIndexStore(t, "put")
defer done()
expected := foo{ID: 3, OrgID: 33, Name: "333"}
update(t, kvStore, func(tx kv.Tx) error {
ent := newFooEnt(expected.ID, expected.OrgID, expected.Name)
return indexStore.Put(context.TODO(), tx, ent, kv.PutNew())
})
key, err := indexStore.IndexStore.EntKey(context.TODO(), kv.Entity{
UniqueKey: kv.Encode(kv.EncID(expected.OrgID), kv.EncString(expected.Name)),
})
require.NoError(t, err)
rawIndex := getEntRaw(t, kvStore, indexStore.IndexStore.BktName, key)
assert.Equal(t, encodeID(t, expected.ID), rawIndex)
})
t.Run("updating entity with no naming collision succeeds", func(t *testing.T) {
indexStore, done, kvStore := newFooIndexStore(t, "put")
defer done()
expected := testPutBase(t, kvStore, indexStore, indexStore.EntStore.BktName)
update(t, kvStore, func(tx kv.Tx) error {
entCopy := newFooEnt(expected.ID, expected.OrgID, "safe name")
return indexStore.Put(context.TODO(), tx, entCopy, kv.PutUpdate())
})
err := kvStore.View(context.TODO(), func(tx kv.Tx) error {
_, err := indexStore.FindEnt(context.TODO(), tx, kv.Entity{
PK: kv.EncID(expected.ID),
UniqueKey: kv.Encode(kv.EncID(expected.OrgID), kv.EncString(expected.Name)),
})
return err
})
require.NoError(t, err)
})
t.Run("error cases", func(t *testing.T) {
t.Run("new entity conflicts with existing", func(t *testing.T) {
indexStore, done, kvStore := newFooIndexStore(t, "put")
defer done()
expected := testPutBase(t, kvStore, indexStore, indexStore.EntStore.BktName)
err := kvStore.Update(context.TODO(), func(tx kv.Tx) error {
entCopy := newFooEnt(expected.ID, expected.OrgID, expected.Name)
return indexStore.Put(context.TODO(), tx, entCopy, kv.PutNew())
})
require.Error(t, err)
assert.Equal(t, influxdb.EConflict, influxdb.ErrorCode(err))
})
t.Run("updating entity that does not exist", func(t *testing.T) {
indexStore, done, kvStore := newFooIndexStore(t, "put")
defer done()
expected := testPutBase(t, kvStore, indexStore, indexStore.EntStore.BktName)
update(t, kvStore, func(tx kv.Tx) error {
ent := newFooEnt(9000, expected.OrgID, "name1")
return indexStore.Put(context.TODO(), tx, ent, kv.PutNew())
})
err := kvStore.Update(context.TODO(), func(tx kv.Tx) error {
// ent by id does not exist
entCopy := newFooEnt(333, expected.OrgID, "name1")
return indexStore.Put(context.TODO(), tx, entCopy, kv.PutUpdate())
})
require.Error(t, err)
assert.Equal(t, influxdb.ENotFound, influxdb.ErrorCode(err), "got: "+err.Error())
})
t.Run("updating entity that does collides with an existing entity", func(t *testing.T) {
indexStore, done, kvStore := newFooIndexStore(t, "put")
defer done()
expected := testPutBase(t, kvStore, indexStore, indexStore.EntStore.BktName)
update(t, kvStore, func(tx kv.Tx) error {
ent := newFooEnt(9000, expected.OrgID, "name1")
return indexStore.Put(context.TODO(), tx, ent, kv.PutNew())
})
err := kvStore.Update(context.TODO(), func(tx kv.Tx) error {
// name conflicts
entCopy := newFooEnt(expected.ID, expected.OrgID, "name1")
return indexStore.Put(context.TODO(), tx, entCopy, kv.PutUpdate())
})
require.Error(t, err)
assert.Equal(t, influxdb.EConflict, influxdb.ErrorCode(err))
assert.Contains(t, err.Error(), "update conflicts")
})
})
})
t.Run("DeleteEnt", func(t *testing.T) {
indexStore, done, inmem := newFooIndexStore(t, "delete_ent")
indexStore, done, kvStore := newFooIndexStore(t, "delete_ent")
defer done()
expected := testDeleteEntBase(t, inmem, indexStore)
expected := testDeleteEntBase(t, kvStore, indexStore)
err := inmem.View(context.TODO(), func(tx kv.Tx) error {
err := kvStore.View(context.TODO(), func(tx kv.Tx) error {
_, err := indexStore.IndexStore.FindEnt(context.TODO(), tx, kv.Entity{
UniqueKey: expected.UniqueKey,
})
@ -92,9 +193,9 @@ func TestIndexStore(t *testing.T) {
t.Run("FindEnt", func(t *testing.T) {
t.Run("by ID", func(t *testing.T) {
base, done, inmemStore := newFooIndexStore(t, "find_ent")
base, done, kvStoreStore := newFooIndexStore(t, "find_ent")
defer done()
testFindEnt(t, inmemStore, base)
testFindEnt(t, kvStoreStore, base)
})
t.Run("find by name", func(t *testing.T) {

View File

@ -4,7 +4,6 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"github.com/influxdata/influxdb"
@ -92,7 +91,7 @@ func newVariableStore() *IndexStore {
var decValToEntFn ConvertValToEntFn = func(_ []byte, i interface{}) (entity Entity, err error) {
v, ok := i.(*influxdb.Variable)
if err := errUnexpectedDecodeVal(ok); err != nil {
if err := IsErrUnexpectedDecodeVal(ok); err != nil {
return Entity{}, err
}
return Entity{
@ -200,7 +199,7 @@ func (s *Service) findVariableByID(ctx context.Context, tx Tx, id influxdb.ID) (
}
variable, ok := body.(*influxdb.Variable)
return variable, errUnexpectedDecodeVal(ok)
return variable, IsErrUnexpectedDecodeVal(ok)
}
// CreateVariable creates a new variable and assigns it an ID
@ -213,46 +212,23 @@ func (s *Service) CreateVariable(ctx context.Context, v *influxdb.Variable) erro
}
}
v.Name = strings.TrimSpace(v.Name) // TODO: move to service layer
_, err := s.variableStore.FindEnt(ctx, tx, Entity{
UniqueKey: Encode(EncID(v.OrganizationID), EncStringCaseInsensitive(v.Name)),
})
if err == nil {
return &influxdb.Error{
Code: influxdb.EConflict,
Msg: fmt.Sprintf("variable with name %s already exists", v.Name),
}
}
v.Name = strings.ToLower(strings.TrimSpace(v.Name)) // TODO: move to service layer
v.ID = s.IDGenerator.ID()
now := s.Now()
v.CreatedAt = now
v.UpdatedAt = now
return s.putVariable(ctx, tx, v)
return s.putVariable(ctx, tx, v, PutNew())
})
}
// ReplaceVariable puts a variable in the store
func (s *Service) ReplaceVariable(ctx context.Context, v *influxdb.Variable) error {
return s.kv.Update(ctx, func(tx Tx) error {
_, err := s.variableStore.FindEnt(ctx, tx, Entity{
PK: EncID(v.ID),
UniqueKey: Encode(EncID(v.OrganizationID), EncStringCaseInsensitive(v.Name)),
})
if err == nil {
return &influxdb.Error{
Code: influxdb.EConflict,
Msg: fmt.Sprintf("variable with name %s already exists", v.Name),
}
}
return s.putVariable(ctx, tx, v)
return s.putVariable(ctx, tx, v, PutNew())
})
}
func (s *Service) putVariable(ctx context.Context, tx Tx, v *influxdb.Variable) error {
func (s *Service) putVariable(ctx context.Context, tx Tx, v *influxdb.Variable, putOpts ...PutOptionFn) error {
if err := s.putVariableOrgsIndex(tx, v); err != nil {
return err
}
@ -262,7 +238,7 @@ func (s *Service) putVariable(ctx context.Context, tx Tx, v *influxdb.Variable)
UniqueKey: Encode(EncID(v.OrganizationID), EncStringCaseInsensitive(v.Name)),
Body: v,
}
return s.variableStore.Put(ctx, tx, ent)
return s.variableStore.Put(ctx, tx, ent, putOpts...)
}
// UpdateVariable updates a single variable in the store with a changeset
@ -274,41 +250,13 @@ func (s *Service) UpdateVariable(ctx context.Context, id influxdb.ID, update *in
return err
}
m.UpdatedAt = s.Now()
v = m
if update.Name != "" {
// TODO: should be moved to service layer
update.Name = strings.ToLower(strings.TrimSpace(update.Name))
vbytes, err := s.variableStore.FindEnt(ctx, tx, Entity{
UniqueKey: Encode(EncID(v.OrganizationID), EncStringCaseInsensitive(update.Name)),
})
if err == nil {
existingVar, ok := vbytes.(*influxdb.Variable)
if err := errUnexpectedDecodeVal(ok); err != nil {
return err
}
if existingVar.ID != v.ID {
return &influxdb.Error{
Code: influxdb.EConflict,
Msg: fmt.Sprintf("variable with name %s already exists", update.Name),
}
}
}
err = s.variableStore.IndexStore.DeleteEnt(ctx, tx, Entity{
UniqueKey: Encode(EncID(v.OrganizationID), EncStringCaseInsensitive(update.Name)),
})
if err != nil {
return err
}
v.Name = update.Name
}
// TODO: should be moved to service layer
update.Name = strings.ToLower(strings.TrimSpace(update.Name))
update.Apply(m)
return s.putVariable(ctx, tx, v)
return s.putVariable(ctx, tx, v, PutUpdate())
})
return v, err

View File

@ -552,7 +552,7 @@ func CreateCheck(
err: &influxdb.Error{
Code: influxdb.EConflict,
Op: influxdb.OpCreateCheck,
Msg: fmt.Sprintf("check with name name1 already exists"),
Msg: fmt.Sprintf("check is not unique"),
},
},
},
@ -1706,7 +1706,7 @@ func PatchCheck(
upd influxdb.CheckUpdate
}
type wants struct {
err error
err *influxdb.Error
check influxdb.Check
}
@ -1840,7 +1840,7 @@ data = from(bucket: "telegraf") |> range(start: -1m)`,
wants: wants{
err: &influxdb.Error{
Code: influxdb.EConflict,
Msg: "check name is not unique",
Msg: "check entity update conflicts with an existing entity",
},
},
},
@ -1848,12 +1848,12 @@ data = from(bucket: "telegraf") |> range(start: -1m)`,
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, opPrefix, done := init(tt.fields, t)
s, _, done := init(tt.fields, t)
defer done()
ctx := context.Background()
check, err := s.PatchCheck(ctx, tt.args.id, tt.args.upd)
diffPlatformErrors(tt.name, err, tt.wants.err, opPrefix, t)
influxErrsEqual(t, tt.wants.err, err)
if diff := cmp.Diff(check, tt.wants.check, checkCmpOptions...); diff != "" {
t.Errorf("check is different -got/+want\ndiff %s", diff)

View File

@ -89,7 +89,7 @@ func CreateVariable(init func(VariableFields, *testing.T) (platform.VariableServ
variable *platform.Variable
}
type wants struct {
err error
err *platform.Error
variables []*platform.Variable
}
@ -260,7 +260,7 @@ func CreateVariable(init func(VariableFields, *testing.T) (platform.VariableServ
wants: wants{
err: &platform.Error{
Code: platform.EConflict,
Msg: "variable with name existing-variable already exists",
Msg: "variable is not unique",
},
variables: []*platform.Variable{
{
@ -317,7 +317,7 @@ func CreateVariable(init func(VariableFields, *testing.T) (platform.VariableServ
wants: wants{
err: &platform.Error{
Code: platform.EConflict,
Msg: "variable with name EXISTING-variable already exists",
Msg: "variable is not unique for key ",
},
variables: []*platform.Variable{
{
@ -374,7 +374,7 @@ func CreateVariable(init func(VariableFields, *testing.T) (platform.VariableServ
wants: wants{
err: &platform.Error{
Code: platform.EConflict,
Msg: "variable with name existing-variable already exists",
Msg: "variable is not unique",
},
variables: []*platform.Variable{
{
@ -431,7 +431,7 @@ func CreateVariable(init func(VariableFields, *testing.T) (platform.VariableServ
wants: wants{
err: &platform.Error{
Code: platform.EConflict,
Msg: "variable with name existing-variable already exists",
Msg: "variable is not unique",
},
variables: []*platform.Variable{
{
@ -451,12 +451,12 @@ func CreateVariable(init func(VariableFields, *testing.T) (platform.VariableServ
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, opPrefix, done := init(tt.fields, t)
s, _, done := init(tt.fields, t)
defer done()
ctx := context.Background()
err := s.CreateVariable(ctx, tt.args.variable)
diffPlatformErrors(tt.name, err, tt.wants.err, opPrefix, t)
influxErrsEqual(t, tt.wants.err, err)
variables, err := s.FindVariables(ctx, platform.VariableFilter{})
if err != nil {
@ -938,8 +938,7 @@ func UpdateVariable(init func(VariableFields, *testing.T) (platform.VariableServ
wants: wants{
err: &platform.Error{
Code: platform.EConflict,
Op: platform.OpUpdateVariable,
Msg: "variable with name variable-a already exists",
Msg: "variable entity update conflicts with an existing entity",
},
variables: []*platform.Variable{
{
@ -1013,8 +1012,7 @@ func UpdateVariable(init func(VariableFields, *testing.T) (platform.VariableServ
wants: wants{
err: &platform.Error{
Code: platform.EConflict,
Op: platform.OpUpdateVariable,
Msg: "variable with name variable-a already exists",
Msg: "variable entity update conflicts with an existing entity",
},
variables: []*platform.Variable{
{
@ -1055,14 +1053,8 @@ func UpdateVariable(init func(VariableFields, *testing.T) (platform.VariableServ
ctx := context.Background()
variable, err := s.UpdateVariable(ctx, tt.args.id, tt.args.update)
influxErrsEqual(t, tt.wants.err, err)
if err != nil {
if tt.wants.err == nil {
require.NoError(t, err)
}
iErr, ok := err.(*platform.Error)
require.True(t, ok)
assert.Equal(t, iErr.Code, tt.wants.err.Code)
assert.Equal(t, strings.HasPrefix(iErr.Error(), tt.wants.err.Error()), true)
return
}