fix(kv): add legacy variable orgs index back

issue here is that the unique by name index for variables was implemented
and has the same functionality about it that this orgs index has. The duplicative
orgs index was nuked. The migration to hydrate the org/name index never
happened. This is a stop gap until that migration is in place.
pull/16367/head
Johnny Steenbergen 2019-12-31 10:28:26 -08:00 committed by Johnny Steenbergen
parent a926700965
commit 4fb855feff
2 changed files with 150 additions and 25 deletions

View File

@ -147,6 +147,10 @@ func (s *Service) Initialize(ctx context.Context) error {
return err
}
if err := s.initializeVariablesOrgIndex(tx); err != nil {
return err
}
if err := s.initializeChecks(ctx, tx); err != nil {
return err
}

View File

@ -1,6 +1,7 @@
package kv
import (
"bytes"
"context"
"encoding/json"
"fmt"
@ -9,6 +10,78 @@ import (
"github.com/influxdata/influxdb"
)
// TODO: eradicate this with migration strategy
var variableOrgsIndex = []byte("variableorgsv1")
func (s *Service) initializeVariablesOrgIndex(tx Tx) error {
if _, err := tx.Bucket(variableOrgsIndex); err != nil {
return err
}
return nil
}
func decodeVariableOrgsIndexKey(indexKey []byte) (orgID influxdb.ID, variableID influxdb.ID, err error) {
if len(indexKey) != 2*influxdb.IDLength {
return 0, 0, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "malformed variable orgs index key (please report this error)",
}
}
if err := (&orgID).Decode(indexKey[:influxdb.IDLength]); err != nil {
return 0, 0, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "bad org id",
Err: influxdb.ErrInvalidID,
}
}
if err := (&variableID).Decode(indexKey[influxdb.IDLength:]); err != nil {
return 0, 0, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "bad variable id",
Err: influxdb.ErrInvalidID,
}
}
return orgID, variableID, nil
}
func (s *Service) findOrganizationVariables(ctx context.Context, tx Tx, orgID influxdb.ID) ([]*influxdb.Variable, error) {
idx, err := tx.Bucket(variableOrgsIndex)
if err != nil {
return nil, err
}
// TODO(leodido): support find options
cur, err := idx.Cursor()
if err != nil {
return nil, err
}
prefix, err := orgID.Encode()
if err != nil {
return nil, err
}
variables := []*influxdb.Variable{}
for k, _ := cur.Seek(prefix); bytes.HasPrefix(k, prefix); k, _ = cur.Next() {
_, id, err := decodeVariableOrgsIndexKey(k)
if err != nil {
return nil, err
}
m, err := s.findVariableByID(ctx, tx, id)
if err != nil {
return nil, err
}
variables = append(variables, m)
}
return variables, nil
}
func newVariableUniqueByNameStore() *IndexStore {
const resource = "variable"
@ -37,31 +110,6 @@ func newVariableUniqueByNameStore() *IndexStore {
}
}
func (s *Service) findOrganizationVariables(ctx context.Context, tx Tx, orgID influxdb.ID) ([]*influxdb.Variable, error) {
prefix, err := orgID.Encode()
if err != nil {
return nil, err
}
variables := []*influxdb.Variable{}
err = s.variableStore.Find(ctx, tx, FindOpts{
Prefix: prefix,
CaptureFn: func(key []byte, decodedVal interface{}) error {
v, ok := decodedVal.(*influxdb.Variable)
if err := errUnexpectedDecodeVal(ok); err != nil {
return err
}
variables = append(variables, v)
return nil
},
})
if err != nil {
return nil, err
}
return variables, nil
}
func (s *Service) findVariables(ctx context.Context, tx Tx, filter influxdb.VariableFilter) ([]*influxdb.Variable, error) {
if filter.OrganizationID != nil {
return s.findOrganizationVariables(ctx, tx, *filter.OrganizationID)
@ -176,6 +224,7 @@ func (s *Service) CreateVariable(ctx context.Context, v *influxdb.Variable) erro
now := s.Now()
v.CreatedAt = now
v.UpdatedAt = now
return s.putVariable(ctx, tx, v)
})
}
@ -200,6 +249,10 @@ func (s *Service) ReplaceVariable(ctx context.Context, v *influxdb.Variable) err
}
func (s *Service) putVariable(ctx context.Context, tx Tx, v *influxdb.Variable) error {
if err := s.putVariableOrgsIndex(tx, v); err != nil {
return err
}
ent := Entity{
ID: v.ID,
Name: v.Name,
@ -263,6 +316,74 @@ func (s *Service) UpdateVariable(ctx context.Context, id influxdb.ID, update *in
// DeleteVariable removes a single variable from the store by its ID
func (s *Service) DeleteVariable(ctx context.Context, id influxdb.ID) error {
return s.kv.Update(ctx, func(tx Tx) error {
v, err := s.findVariableByID(ctx, tx, id)
if err != nil {
return err
}
if err := s.removeVariableOrgsIndex(tx, v); err != nil {
return err
}
return s.variableStore.DeleteEnt(ctx, tx, Entity{ID: id})
})
}
func encodeVariableOrgsIndex(variable *influxdb.Variable) ([]byte, error) {
oID, err := variable.OrganizationID.Encode()
if err != nil {
return nil, &influxdb.Error{
Err: err,
Msg: "bad organization id",
}
}
mID, err := variable.ID.Encode()
if err != nil {
return nil, &influxdb.Error{
Err: err,
Msg: "bad variable id",
}
}
key := make([]byte, 0, influxdb.IDLength*2)
key = append(key, oID...)
key = append(key, mID...)
return key, nil
}
func (s *Service) putVariableOrgsIndex(tx Tx, variable *influxdb.Variable) error {
key, err := encodeVariableOrgsIndex(variable)
if err != nil {
return err
}
idx, err := tx.Bucket(variableOrgsIndex)
if err != nil {
return &influxdb.Error{Code: influxdb.EInternal, Err: err}
}
if err := idx.Put(key, nil); err != nil {
return &influxdb.Error{Code: influxdb.EInternal, Err: err}
}
return nil
}
func (s *Service) removeVariableOrgsIndex(tx Tx, variable *influxdb.Variable) error {
key, err := encodeVariableOrgsIndex(variable)
if err != nil {
return err
}
idx, err := tx.Bucket(variableOrgsIndex)
if err != nil {
return &influxdb.Error{Code: influxdb.EInternal, Err: err}
}
if err := idx.Delete(key); err != nil {
return &influxdb.Error{Code: influxdb.EInternal, Err: err}
}
return nil
}