diff --git a/kv/service.go b/kv/service.go index 4f8ca8770c..f2c576da08 100644 --- a/kv/service.go +++ b/kv/service.go @@ -147,6 +147,10 @@ func (s *Service) Initialize(ctx context.Context) error { return err } + if err := s.initializeVariablesOrgIndex(tx); err != nil { + return err + } + if err := s.initializeChecks(ctx, tx); err != nil { return err } diff --git a/kv/variable.go b/kv/variable.go index c81f46cd4a..1589c6f24f 100644 --- a/kv/variable.go +++ b/kv/variable.go @@ -1,6 +1,7 @@ package kv import ( + "bytes" "context" "encoding/json" "fmt" @@ -9,6 +10,78 @@ import ( "github.com/influxdata/influxdb" ) +// TODO: eradicate this with migration strategy +var variableOrgsIndex = []byte("variableorgsv1") + +func (s *Service) initializeVariablesOrgIndex(tx Tx) error { + if _, err := tx.Bucket(variableOrgsIndex); err != nil { + return err + } + return nil +} + +func decodeVariableOrgsIndexKey(indexKey []byte) (orgID influxdb.ID, variableID influxdb.ID, err error) { + if len(indexKey) != 2*influxdb.IDLength { + return 0, 0, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "malformed variable orgs index key (please report this error)", + } + } + + if err := (&orgID).Decode(indexKey[:influxdb.IDLength]); err != nil { + return 0, 0, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "bad org id", + Err: influxdb.ErrInvalidID, + } + } + + if err := (&variableID).Decode(indexKey[influxdb.IDLength:]); err != nil { + return 0, 0, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "bad variable id", + Err: influxdb.ErrInvalidID, + } + } + + return orgID, variableID, nil +} + +func (s *Service) findOrganizationVariables(ctx context.Context, tx Tx, orgID influxdb.ID) ([]*influxdb.Variable, error) { + idx, err := tx.Bucket(variableOrgsIndex) + if err != nil { + return nil, err + } + + // TODO(leodido): support find options + cur, err := idx.Cursor() + if err != nil { + return nil, err + } + + prefix, err := orgID.Encode() + if err != nil { + return nil, err + } + + variables := []*influxdb.Variable{} + for k, _ := cur.Seek(prefix); bytes.HasPrefix(k, prefix); k, _ = cur.Next() { + _, id, err := decodeVariableOrgsIndexKey(k) + if err != nil { + return nil, err + } + + m, err := s.findVariableByID(ctx, tx, id) + if err != nil { + return nil, err + } + + variables = append(variables, m) + } + + return variables, nil +} + func newVariableUniqueByNameStore() *IndexStore { const resource = "variable" @@ -37,31 +110,6 @@ func newVariableUniqueByNameStore() *IndexStore { } } -func (s *Service) findOrganizationVariables(ctx context.Context, tx Tx, orgID influxdb.ID) ([]*influxdb.Variable, error) { - prefix, err := orgID.Encode() - if err != nil { - return nil, err - } - - variables := []*influxdb.Variable{} - err = s.variableStore.Find(ctx, tx, FindOpts{ - Prefix: prefix, - CaptureFn: func(key []byte, decodedVal interface{}) error { - v, ok := decodedVal.(*influxdb.Variable) - if err := errUnexpectedDecodeVal(ok); err != nil { - return err - } - variables = append(variables, v) - return nil - }, - }) - if err != nil { - return nil, err - } - - return variables, nil -} - func (s *Service) findVariables(ctx context.Context, tx Tx, filter influxdb.VariableFilter) ([]*influxdb.Variable, error) { if filter.OrganizationID != nil { return s.findOrganizationVariables(ctx, tx, *filter.OrganizationID) @@ -176,6 +224,7 @@ func (s *Service) CreateVariable(ctx context.Context, v *influxdb.Variable) erro now := s.Now() v.CreatedAt = now v.UpdatedAt = now + return s.putVariable(ctx, tx, v) }) } @@ -200,6 +249,10 @@ func (s *Service) ReplaceVariable(ctx context.Context, v *influxdb.Variable) err } func (s *Service) putVariable(ctx context.Context, tx Tx, v *influxdb.Variable) error { + if err := s.putVariableOrgsIndex(tx, v); err != nil { + return err + } + ent := Entity{ ID: v.ID, Name: v.Name, @@ -263,6 +316,74 @@ func (s *Service) UpdateVariable(ctx context.Context, id influxdb.ID, update *in // DeleteVariable removes a single variable from the store by its ID func (s *Service) DeleteVariable(ctx context.Context, id influxdb.ID) error { return s.kv.Update(ctx, func(tx Tx) error { + v, err := s.findVariableByID(ctx, tx, id) + if err != nil { + return err + } + + if err := s.removeVariableOrgsIndex(tx, v); err != nil { + return err + } return s.variableStore.DeleteEnt(ctx, tx, Entity{ID: id}) }) } + +func encodeVariableOrgsIndex(variable *influxdb.Variable) ([]byte, error) { + oID, err := variable.OrganizationID.Encode() + if err != nil { + return nil, &influxdb.Error{ + Err: err, + Msg: "bad organization id", + } + } + + mID, err := variable.ID.Encode() + if err != nil { + return nil, &influxdb.Error{ + Err: err, + Msg: "bad variable id", + } + } + + key := make([]byte, 0, influxdb.IDLength*2) + key = append(key, oID...) + key = append(key, mID...) + + return key, nil +} + +func (s *Service) putVariableOrgsIndex(tx Tx, variable *influxdb.Variable) error { + key, err := encodeVariableOrgsIndex(variable) + if err != nil { + return err + } + + idx, err := tx.Bucket(variableOrgsIndex) + if err != nil { + return &influxdb.Error{Code: influxdb.EInternal, Err: err} + } + + if err := idx.Put(key, nil); err != nil { + return &influxdb.Error{Code: influxdb.EInternal, Err: err} + } + + return nil +} + +func (s *Service) removeVariableOrgsIndex(tx Tx, variable *influxdb.Variable) error { + key, err := encodeVariableOrgsIndex(variable) + if err != nil { + return err + } + + idx, err := tx.Bucket(variableOrgsIndex) + if err != nil { + return &influxdb.Error{Code: influxdb.EInternal, Err: err} + } + + if err := idx.Delete(key); err != nil { + return &influxdb.Error{Code: influxdb.EInternal, Err: err} + } + + return nil +}