chore(kv): refactor variable service to use the new uniqByNameStore type

pull/16374/head
Johnny Steenbergen 2019-12-25 23:35:42 -08:00 committed by Johnny Steenbergen
parent 300d89887d
commit 5a6bda238c
5 changed files with 169 additions and 326 deletions

View File

@ -38,6 +38,7 @@ type Service struct {
Hash Crypt
endpointStore *uniqByNameStore
variableStore *uniqByNameStore
}
// NewService returns an instance of a Service.
@ -51,7 +52,8 @@ func NewService(log *zap.Logger, kv Store, configs ...ServiceConfig) *Service {
Hash: &Bcrypt{},
kv: kv,
TimeGenerator: influxdb.RealTimeGenerator{},
endpointStore: newEndpointUniqueByNameStore(kv),
endpointStore: newEndpointStore(kv),
variableStore: newVariableUniqueByNameStore(kv),
}
if len(configs) > 0 {
@ -141,7 +143,7 @@ func (s *Service) Initialize(ctx context.Context) error {
return err
}
if err := s.initializeVariables(ctx, tx); err != nil {
if err := s.variableStore.initBuckets(ctx, tx); err != nil {
return err
}

View File

@ -8,22 +8,30 @@ import (
"strings"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kit/tracing"
)
var (
variableBucket = []byte("variablesv1")
variableOrgsIndex = []byte("variableorgsv1")
variablesIndex = []byte("variablesindexv1")
)
func (s *Service) initializeVariables(ctx context.Context, tx Tx) error {
if _, err := tx.Bucket(variableBucket); err != nil {
return err
func newVariableUniqueByNameStore(kv Store) *uniqByNameStore {
return &uniqByNameStore{
kv: kv,
caseInsensitive: true,
resource: "variable",
bktName: []byte("variablesv1"),
indexName: []byte("variablesindexv1"),
decodeBucketEntFn: func(key, val []byte) ([]byte, interface{}, error) {
var v influxdb.Variable
return key, &v, json.Unmarshal(val, &v)
},
decodeOrgNameFn: func(body []byte) (influxdb.ID, string, error) {
var v influxdb.Variable
err := json.Unmarshal(body, &v)
return v.ID, v.Name, err
},
}
if _, err := tx.Bucket(variableOrgsIndex); err != nil {
return err
}
return nil
}
func decodeVariableOrgsIndexKey(indexKey []byte) (orgID influxdb.ID, variableID influxdb.ID, err error) {
@ -101,61 +109,36 @@ func (s *Service) findVariables(ctx context.Context, tx Tx, filter influxdb.Vari
return s.findOrganizationVariables(ctx, tx, o.ID)
}
variables := []*influxdb.Variable{}
filterFn := filterVariablesFn(filter)
err := s.forEachVariable(ctx, tx, func(m *influxdb.Variable) bool {
if filterFn(m) {
variables = append(variables, m)
}
return true
})
// TODO(jsteenb2): investigate why we don't implement the find options for vars?
var o influxdb.FindOptions
var variables []*influxdb.Variable
err := s.variableStore.Find(ctx, tx, o, filterVariablesFn(filter), func(k []byte, v interface{}) {
variables = append(variables, v.(*influxdb.Variable))
})
if err != nil {
return nil, err
}
return variables, nil
}
func filterVariablesFn(filter influxdb.VariableFilter) func(m *influxdb.Variable) bool {
if filter.ID != nil {
return func(m *influxdb.Variable) bool {
return m.ID == *filter.ID
func filterVariablesFn(filter influxdb.VariableFilter) func([]byte, interface{}) bool {
return func(key []byte, val interface{}) bool {
variable, ok := val.(*influxdb.Variable)
if !ok {
return false
}
}
if filter.OrganizationID != nil {
return func(m *influxdb.Variable) bool {
return m.OrganizationID == *filter.OrganizationID
if filter.ID != nil {
return variable.ID == *filter.ID
}
}
return func(m *influxdb.Variable) bool { return true }
}
// forEachVariable will iterate through all variables while fn returns true.
func (s *Service) forEachVariable(ctx context.Context, tx Tx, fn func(*influxdb.Variable) bool) error {
b, err := tx.Bucket(variableBucket)
if err != nil {
return err
}
cur, err := b.Cursor()
if err != nil {
return err
}
for k, v := cur.First(); k != nil; k, v = cur.Next() {
m := &influxdb.Variable{}
if err := json.Unmarshal(v, m); err != nil {
return err
if filter.OrganizationID != nil {
return variable.OrganizationID == *filter.OrganizationID
}
if !fn(m) {
break
}
}
return nil
return true
}
}
// FindVariables returns all variables in the store
@ -170,152 +153,94 @@ func (s *Service) FindVariables(ctx context.Context, filter influxdb.VariableFil
res = variables
return nil
})
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
return res, nil
return res, err
}
// FindVariableByID finds a single variable in the store by its ID
func (s *Service) FindVariableByID(ctx context.Context, id influxdb.ID) (*influxdb.Variable, error) {
var variable *influxdb.Variable
err := s.kv.View(ctx, func(tx Tx) error {
m, pe := s.findVariableByID(ctx, tx, id)
if pe != nil {
return &influxdb.Error{
Err: pe,
}
m, err := s.findVariableByID(ctx, tx, id)
if err != nil {
return err
}
variable = m
return nil
})
if err != nil {
return nil, err
}
return variable, nil
return variable, err
}
func (s *Service) findVariableByID(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Variable, error) {
encID, err := id.Encode()
body, err := s.variableStore.FindByID(ctx, tx, id)
if err != nil {
return nil, err
}
var v influxdb.Variable
if err := json.Unmarshal(body, &v); err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Code: influxdb.EInternal,
Err: err,
}
}
b, err := tx.Bucket(variableBucket)
if err != nil {
return nil, err
}
d, err := b.Get(encID)
if IsNotFound(err) {
return nil, &influxdb.Error{
Code: influxdb.ENotFound,
Msg: influxdb.ErrVariableNotFound,
}
}
if err != nil {
return nil, err
}
variable := &influxdb.Variable{}
err = json.Unmarshal(d, &variable)
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
return variable, nil
return &v, nil
}
// CreateVariable creates a new variable and assigns it an ID
func (s *Service) CreateVariable(ctx context.Context, variable *influxdb.Variable) error {
func (s *Service) CreateVariable(ctx context.Context, v *influxdb.Variable) error {
return s.kv.Update(ctx, func(tx Tx) error {
if err := variable.Valid(); err != nil {
if err := v.Valid(); err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
variable.Name = strings.TrimSpace(variable.Name)
v.Name = strings.TrimSpace(v.Name) // TODO: move to service layer
if err := s.uniqueVariableName(ctx, tx, variable); err != nil {
return err
if _, err := s.variableStore.FindByName(ctx, tx, v.OrganizationID, v.Name); err == nil {
return &influxdb.Error{
Code: influxdb.EConflict,
Msg: fmt.Sprintf("variable with name %s already exists", v.Name),
}
}
variable.ID = s.IDGenerator.ID()
v.ID = s.IDGenerator.ID()
if err := s.putVariableOrgsIndex(ctx, tx, variable); err != nil {
if err := s.putVariableOrgsIndex(ctx, tx, v); err != nil {
return err
}
now := s.Now()
variable.CreatedAt = now
variable.UpdatedAt = now
if pe := s.putVariable(ctx, tx, variable); pe != nil {
return &influxdb.Error{
Err: pe,
}
}
return nil
v.CreatedAt = now
v.UpdatedAt = now
return s.putVariable(ctx, tx, v)
})
}
// ReplaceVariable puts a variable in the store
func (s *Service) ReplaceVariable(ctx context.Context, variable *influxdb.Variable) error {
func (s *Service) ReplaceVariable(ctx context.Context, v *influxdb.Variable) error {
return s.kv.Update(ctx, func(tx Tx) error {
if err := s.putVariableOrgsIndex(ctx, tx, variable); err != nil {
if err := s.putVariableOrgsIndex(ctx, tx, v); err != nil {
return &influxdb.Error{
Err: err,
}
}
err := s.uniqueVariableName(ctx, tx, variable)
if err != nil {
if _, err := s.variableStore.FindByName(ctx, tx, v.OrganizationID, v.Name); err == nil {
return &influxdb.Error{
Err: err,
Code: influxdb.EConflict,
Msg: fmt.Sprintf("variable with name %s already exists", v.Name),
}
}
return s.putVariable(ctx, tx, variable)
return s.putVariable(ctx, tx, v)
})
}
func encodeVariableOrgsIndex(variable *influxdb.Variable) ([]byte, error) {
oID, err := variable.OrganizationID.Encode()
if err != nil {
return nil, &influxdb.Error{
Err: err,
Msg: "bad organization id",
}
}
mID, err := variable.ID.Encode()
if err != nil {
return nil, &influxdb.Error{
Err: err,
Msg: "bad variable id",
}
}
key := make([]byte, 0, influxdb.IDLength*2)
key = append(key, oID...)
key = append(key, mID...)
return key, nil
}
func (s *Service) putVariableOrgsIndex(ctx context.Context, tx Tx, variable *influxdb.Variable) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
key, err := encodeVariableOrgsIndex(variable)
if err != nil {
return err
@ -353,90 +278,69 @@ func (s *Service) removeVariableOrgsIndex(ctx context.Context, tx Tx, variable *
return nil
}
func (s *Service) putVariable(ctx context.Context, tx Tx, variable *influxdb.Variable) error {
m, err := json.Marshal(variable)
func (s *Service) putVariable(ctx context.Context, tx Tx, v *influxdb.Variable) error {
m, err := json.Marshal(v)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
encID, err := variable.ID.Encode()
if err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Code: influxdb.EInternal,
Err: err,
}
}
err = s.createVariableIndex(ctx, tx, variable)
if err != nil {
return &influxdb.Error{
Err: err,
}
ent := Entity{
ID: v.ID,
Name: v.Name,
OrgID: v.OrganizationID,
Body: m,
}
b, err := tx.Bucket(variableBucket)
if err != nil {
return err
}
if err := b.Put(encID, m); err != nil {
return &influxdb.Error{
Err: err,
}
}
return nil
return s.variableStore.Put(ctx, tx, ent)
}
// UpdateVariable updates a single variable in the store with a changeset
func (s *Service) UpdateVariable(ctx context.Context, id influxdb.ID, update *influxdb.VariableUpdate) (*influxdb.Variable, error) {
var variable *influxdb.Variable
var v *influxdb.Variable
err := s.kv.Update(ctx, func(tx Tx) error {
m, err := s.findVariableByID(ctx, tx, id)
if err != nil {
return &influxdb.Error{
Err: err,
}
return err
}
m.UpdatedAt = s.Now()
variable = m
v = m
if update.Name != "" {
update.Name = strings.TrimSpace(update.Name)
// TODO: should be moved to service layer
update.Name = strings.ToLower(strings.TrimSpace(update.Name))
err = s.deleteVariableIndex(ctx, tx, variable)
if err != nil {
return &influxdb.Error{
Err: err,
vbytes, err := s.variableStore.FindByName(ctx, tx, v.OrganizationID, update.Name)
if err == nil {
var existingVar influxdb.Variable
if err := json.Unmarshal(vbytes, &existingVar); err != nil {
return &influxdb.Error{
Code: influxdb.EInternal,
Err: err,
}
}
if existingVar.ID != v.ID {
return &influxdb.Error{
Code: influxdb.EConflict,
Msg: fmt.Sprintf("variable with name %s already exists", update.Name),
}
}
}
variable.Name = update.Name
if err := s.uniqueVariableName(ctx, tx, variable); err != nil {
return &influxdb.Error{
Err: err,
}
if err := s.variableStore.deleteInIndex(ctx, tx, v.OrganizationID, v.Name); err != nil {
return err
}
v.Name = update.Name
}
if err := update.Apply(m); err != nil {
return &influxdb.Error{
Err: err,
}
}
update.Apply(m)
if err = s.putVariable(ctx, tx, variable); err != nil {
return &influxdb.Error{
Err: err,
}
}
return nil
return s.putVariable(ctx, tx, v)
})
return variable, err
return v, err
}
// DeleteVariable removes a single variable from the store by its ID
@ -444,16 +348,7 @@ func (s *Service) DeleteVariable(ctx context.Context, id influxdb.ID) error {
return s.kv.Update(ctx, func(tx Tx) error {
v, err := s.findVariableByID(ctx, tx, id)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
encID, err := id.Encode()
if err != nil {
return &influxdb.Error{
Err: err,
}
return err
}
if err := s.removeVariableOrgsIndex(ctx, tx, v); err != nil {
@ -462,113 +357,30 @@ func (s *Service) DeleteVariable(ctx context.Context, id influxdb.ID) error {
}
}
b, err := tx.Bucket(variableBucket)
if err != nil {
return err
}
if err := s.deleteVariableIndex(ctx, tx, v); err != nil {
return &influxdb.Error{
Err: err,
}
}
if err := b.Delete(encID); err != nil {
return &influxdb.Error{
Err: err,
}
}
return nil
return s.variableStore.Delete(ctx, tx, id)
})
}
func variableAlreadyExistsError(v *influxdb.Variable) error {
return &influxdb.Error{
Code: influxdb.EConflict,
Msg: fmt.Sprintf("variable with name %s already exists", v.Name),
}
}
func variableIndexKey(v *influxdb.Variable) ([]byte, error) {
orgID, err := v.OrganizationID.Encode()
func encodeVariableOrgsIndex(variable *influxdb.Variable) ([]byte, error) {
oID, err := variable.OrganizationID.Encode()
if err != nil {
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
Err: err,
Msg: "bad organization id",
}
}
k := make([]byte, influxdb.IDLength+len(v.Name))
copy(k, orgID)
copy(k[influxdb.IDLength:], []byte(strings.ToLower((v.Name))))
return k, nil
}
func (s *Service) deleteVariableIndex(ctx context.Context, tx Tx, v *influxdb.Variable) error {
idx, err := tx.Bucket(variablesIndex)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
idxKey, err := variableIndexKey(v)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
if err := idx.Delete(idxKey); err != nil {
return &influxdb.Error{
Err: err,
}
}
return nil
}
func (s *Service) createVariableIndex(ctx context.Context, tx Tx, v *influxdb.Variable) error {
encID, err := v.OrganizationID.Encode()
if err != nil {
return &influxdb.Error{
Err: err,
}
}
idxBkt, err := tx.Bucket(variablesIndex)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
idxKey, err := variableIndexKey(v)
if err != nil {
return &influxdb.Error{
Err: err,
}
}
if err := idxBkt.Put([]byte(idxKey), encID); err != nil {
return &influxdb.Error{
Err: err,
}
}
return nil
}
func (s *Service) uniqueVariableName(ctx context.Context, tx Tx, v *influxdb.Variable) error {
key, err := variableIndexKey(v)
if err != nil {
return err
}
err = s.unique(ctx, tx, variablesIndex, key)
if err == NotUniqueError {
return variableAlreadyExistsError(v)
}
return nil
mID, err := variable.ID.Encode()
if err != nil {
return nil, &influxdb.Error{
Err: err,
Msg: "bad variable id",
}
}
key := make([]byte, influxdb.IDLength*2)
copy(key, oID)
copy(key[influxdb.IDLength:], mID)
return key, nil
}

View File

@ -49,7 +49,7 @@ func initVariableService(s kv.Store, f influxdbtesting.VariableFields, t *testin
done := func() {
for _, variable := range f.Variables {
if err := svc.DeleteVariable(ctx, variable.ID); err != nil {
t.Fatalf("failed to clean up variables bolt test: %v", err)
t.Logf("failed to clean up variables bolt test: %v", err)
}
}
}

View File

@ -4,12 +4,16 @@ import (
"bytes"
"context"
"sort"
"strings"
"testing"
"time"
"github.com/influxdata/influxdb/pkg/testing/assert"
"github.com/google/go-cmp/cmp"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/mock"
"github.com/stretchr/testify/require"
)
const (
@ -472,7 +476,7 @@ func FindVariableByID(init func(VariableFields, *testing.T) (platform.VariableSe
id platform.ID
}
type wants struct {
err error
err *platform.Error
variable *platform.Variable
}
@ -555,12 +559,21 @@ func FindVariableByID(init func(VariableFields, *testing.T) (platform.VariableSe
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, opPrefix, done := init(tt.fields, t)
s, _, done := init(tt.fields, t)
defer done()
ctx := context.Background()
variable, err := s.FindVariableByID(ctx, tt.args.id)
diffPlatformErrors(tt.name, err, tt.wants.err, opPrefix, t)
if err != nil {
if tt.wants.err == nil {
require.NoError(t, err)
}
iErr, ok := err.(*platform.Error)
require.True(t, ok)
assert.Equal(t, iErr.Code, tt.wants.err.Code)
assert.Equal(t, strings.HasPrefix(iErr.Error(), tt.wants.err.Error()), true)
return
}
if diff := cmp.Diff(variable, tt.wants.variable); diff != "" {
t.Fatalf("found unexpected variable -got/+want\ndiff %s", diff)
@ -781,7 +794,7 @@ func UpdateVariable(init func(VariableFields, *testing.T) (platform.VariableServ
update *platform.VariableUpdate
}
type wants struct {
err error
err *platform.Error
variables []*platform.Variable
}
@ -958,7 +971,7 @@ func UpdateVariable(init func(VariableFields, *testing.T) (platform.VariableServ
},
},
{
name: "trims the variable name, but updating fails when variable name already exists",
name: "trims the variable name but updating fails when variable name already exists",
fields: VariableFields{
TimeGenerator: fakeGenerator,
Variables: []*platform.Variable{
@ -1036,12 +1049,21 @@ func UpdateVariable(init func(VariableFields, *testing.T) (platform.VariableServ
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, opPrefix, done := init(tt.fields, t)
s, _, done := init(tt.fields, t)
defer done()
ctx := context.Background()
variable, err := s.UpdateVariable(ctx, tt.args.id, tt.args.update)
diffPlatformErrors(tt.name, err, tt.wants.err, opPrefix, t)
if err != nil {
if tt.wants.err == nil {
require.NoError(t, err)
}
iErr, ok := err.(*platform.Error)
require.True(t, ok)
assert.Equal(t, iErr.Code, tt.wants.err.Code)
assert.Equal(t, strings.HasPrefix(iErr.Error(), tt.wants.err.Error()), true)
return
}
if variable != nil {
if tt.args.update.Name != "" && variable.Name != tt.args.update.Name {
@ -1066,7 +1088,7 @@ func DeleteVariable(init func(VariableFields, *testing.T) (platform.VariableServ
id platform.ID
}
type wants struct {
err error
err *platform.Error
variables []*platform.Variable
}
@ -1152,7 +1174,7 @@ func DeleteVariable(init func(VariableFields, *testing.T) (platform.VariableServ
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, opPrefix, done := init(tt.fields, t)
s, _, done := init(tt.fields, t)
defer done()
ctx := context.Background()
@ -1161,7 +1183,16 @@ func DeleteVariable(init func(VariableFields, *testing.T) (platform.VariableServ
ID: tt.args.id,
OrganizationID: platform.ID(1),
})
diffPlatformErrors(tt.name, err, tt.wants.err, opPrefix, t)
if err != nil {
if tt.wants.err == nil {
require.NoError(t, err)
}
iErr, ok := err.(*platform.Error)
require.True(t, ok)
assert.Equal(t, iErr.Code, tt.wants.err.Code)
assert.Equal(t, strings.HasPrefix(iErr.Error(), tt.wants.err.Error()), true)
return
}
variables, err := s.FindVariables(ctx, platform.VariableFilter{})
if err != nil {

View File

@ -140,7 +140,7 @@ func (u *VariableUpdate) Valid() error {
}
// Apply applies non-zero fields from a VariableUpdate to a Variable
func (u *VariableUpdate) Apply(m *Variable) error {
func (u *VariableUpdate) Apply(m *Variable) {
if u.Name != "" {
m.Name = u.Name
}
@ -156,8 +156,6 @@ func (u *VariableUpdate) Apply(m *Variable) error {
if u.Description != "" {
m.Description = u.Description
}
return nil
}
// UnmarshalJSON unmarshals json into a VariableArguments struct, using the `Type`