2019-02-19 23:47:19 +00:00
|
|
|
package kv
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"encoding/json"
|
2019-11-04 20:28:21 +00:00
|
|
|
"strings"
|
2019-06-18 22:05:40 +00:00
|
|
|
|
2021-03-30 18:10:02 +00:00
|
|
|
"github.com/influxdata/influxdb/v2/kit/platform"
|
|
|
|
"github.com/influxdata/influxdb/v2/kit/platform/errors"
|
|
|
|
|
2020-04-03 17:39:20 +00:00
|
|
|
"github.com/influxdata/influxdb/v2"
|
2019-02-19 23:47:19 +00:00
|
|
|
)
|
|
|
|
|
2020-07-01 11:08:20 +00:00
|
|
|
var (
|
|
|
|
variableBucket = []byte("variablesv1")
|
|
|
|
variableIndexBucket = []byte("variablesindexv1")
|
|
|
|
// TODO: eradicate this with migration strategy
|
|
|
|
variableOrgsIndex = []byte("variableorgsv1")
|
|
|
|
)
|
2019-12-31 18:28:26 +00:00
|
|
|
|
2021-03-30 18:10:02 +00:00
|
|
|
func decodeVariableOrgsIndexKey(indexKey []byte) (orgID platform.ID, variableID platform.ID, err error) {
|
|
|
|
if len(indexKey) != 2*platform.IDLength {
|
|
|
|
return 0, 0, &errors.Error{
|
|
|
|
Code: errors.EInvalid,
|
2019-12-31 18:28:26 +00:00
|
|
|
Msg: "malformed variable orgs index key (please report this error)",
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-30 18:10:02 +00:00
|
|
|
if err := (&orgID).Decode(indexKey[:platform.IDLength]); err != nil {
|
|
|
|
return 0, 0, &errors.Error{
|
|
|
|
Code: errors.EInvalid,
|
2019-12-31 18:28:26 +00:00
|
|
|
Msg: "bad org id",
|
2021-03-30 18:10:02 +00:00
|
|
|
Err: platform.ErrInvalidID,
|
2019-12-31 18:28:26 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-30 18:10:02 +00:00
|
|
|
if err := (&variableID).Decode(indexKey[platform.IDLength:]); err != nil {
|
|
|
|
return 0, 0, &errors.Error{
|
|
|
|
Code: errors.EInvalid,
|
2019-12-31 18:28:26 +00:00
|
|
|
Msg: "bad variable id",
|
2021-03-30 18:10:02 +00:00
|
|
|
Err: platform.ErrInvalidID,
|
2019-12-31 18:28:26 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return orgID, variableID, nil
|
|
|
|
}
|
|
|
|
|
2021-03-30 18:10:02 +00:00
|
|
|
func (s *Service) findOrganizationVariables(ctx context.Context, tx Tx, orgID platform.ID) ([]*influxdb.Variable, error) {
|
2019-12-31 18:28:26 +00:00
|
|
|
idx, err := tx.Bucket(variableOrgsIndex)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2020-02-03 15:50:06 +00:00
|
|
|
prefix, err := orgID.Encode()
|
2019-12-31 18:28:26 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2020-02-03 15:50:06 +00:00
|
|
|
cur, err := idx.ForwardCursor(prefix, WithCursorPrefix(prefix))
|
2019-12-31 18:28:26 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
variables := []*influxdb.Variable{}
|
2020-02-03 15:50:06 +00:00
|
|
|
for k, _ := cur.Next(); k != nil; k, _ = cur.Next() {
|
2019-12-31 18:28:26 +00:00
|
|
|
_, id, err := decodeVariableOrgsIndexKey(k)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
m, err := s.findVariableByID(ctx, tx, id)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
variables = append(variables, m)
|
|
|
|
}
|
|
|
|
|
|
|
|
return variables, nil
|
|
|
|
}
|
|
|
|
|
2019-12-31 03:11:53 +00:00
|
|
|
func newVariableStore() *IndexStore {
|
2019-12-27 02:15:14 +00:00
|
|
|
const resource = "variable"
|
2019-02-19 23:47:19 +00:00
|
|
|
|
2019-12-31 03:11:53 +00:00
|
|
|
var decodeVarEntFn DecodeBucketValFn = func(key, val []byte) ([]byte, interface{}, error) {
|
2019-12-27 02:15:14 +00:00
|
|
|
var v influxdb.Variable
|
|
|
|
return key, &v, json.Unmarshal(val, &v)
|
2019-02-19 23:47:19 +00:00
|
|
|
}
|
|
|
|
|
2019-12-31 03:11:53 +00:00
|
|
|
var decValToEntFn ConvertValToEntFn = func(_ []byte, i interface{}) (entity Entity, err error) {
|
2019-12-27 02:15:14 +00:00
|
|
|
v, ok := i.(*influxdb.Variable)
|
2020-01-06 23:31:50 +00:00
|
|
|
if err := IsErrUnexpectedDecodeVal(ok); err != nil {
|
2019-12-27 02:15:14 +00:00
|
|
|
return Entity{}, err
|
2019-02-19 23:47:19 +00:00
|
|
|
}
|
2019-12-27 02:15:14 +00:00
|
|
|
return Entity{
|
2020-01-01 00:28:55 +00:00
|
|
|
PK: EncID(v.ID),
|
|
|
|
UniqueKey: Encode(EncID(v.OrganizationID), EncStringCaseInsensitive(v.Name)),
|
|
|
|
Body: v,
|
2019-12-27 02:15:14 +00:00
|
|
|
}, nil
|
2019-02-19 23:47:19 +00:00
|
|
|
}
|
|
|
|
|
2019-12-27 02:15:14 +00:00
|
|
|
return &IndexStore{
|
|
|
|
Resource: resource,
|
2020-07-01 11:08:20 +00:00
|
|
|
EntStore: NewStoreBase(resource, variableBucket, EncIDKey, EncBodyJSON, decodeVarEntFn, decValToEntFn),
|
|
|
|
IndexStore: NewOrgNameKeyStore(resource, variableIndexBucket, false),
|
2019-02-19 23:47:19 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-12-31 03:11:53 +00:00
|
|
|
func (s *Service) findVariables(ctx context.Context, tx Tx, filter influxdb.VariableFilter, opt ...influxdb.FindOptions) ([]*influxdb.Variable, error) {
|
2019-02-19 23:47:19 +00:00
|
|
|
if filter.OrganizationID != nil {
|
|
|
|
return s.findOrganizationVariables(ctx, tx, *filter.OrganizationID)
|
|
|
|
}
|
|
|
|
|
2019-12-31 03:11:53 +00:00
|
|
|
var o influxdb.FindOptions
|
|
|
|
if len(opt) > 0 {
|
|
|
|
o = opt[0]
|
|
|
|
}
|
|
|
|
|
2019-12-26 07:35:42 +00:00
|
|
|
// TODO(jsteenb2): investigate why we don't implement the find options for vars?
|
2019-12-31 03:11:53 +00:00
|
|
|
variables := make([]*influxdb.Variable, 0)
|
2019-12-27 02:15:14 +00:00
|
|
|
err := s.variableStore.Find(ctx, tx, FindOpts{
|
2019-12-31 03:11:53 +00:00
|
|
|
Descending: o.Descending,
|
|
|
|
Limit: o.Limit,
|
|
|
|
Offset: o.Offset,
|
|
|
|
FilterEntFn: filterVariablesFn(filter),
|
2019-12-27 02:15:14 +00:00
|
|
|
CaptureFn: func(key []byte, decodedVal interface{}) error {
|
|
|
|
variables = append(variables, decodedVal.(*influxdb.Variable))
|
|
|
|
return nil
|
|
|
|
},
|
2019-12-26 07:35:42 +00:00
|
|
|
})
|
2019-02-19 23:47:19 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return variables, nil
|
|
|
|
}
|
|
|
|
|
2019-12-26 07:35:42 +00:00
|
|
|
func filterVariablesFn(filter influxdb.VariableFilter) func([]byte, interface{}) bool {
|
|
|
|
return func(key []byte, val interface{}) bool {
|
|
|
|
variable, ok := val.(*influxdb.Variable)
|
|
|
|
if !ok {
|
|
|
|
return false
|
2019-02-19 23:47:19 +00:00
|
|
|
}
|
|
|
|
|
2019-12-26 07:35:42 +00:00
|
|
|
if filter.ID != nil {
|
|
|
|
return variable.ID == *filter.ID
|
2019-02-19 23:47:19 +00:00
|
|
|
}
|
|
|
|
|
2019-12-26 07:35:42 +00:00
|
|
|
if filter.OrganizationID != nil {
|
|
|
|
return variable.OrganizationID == *filter.OrganizationID
|
2019-02-19 23:47:19 +00:00
|
|
|
}
|
|
|
|
|
2019-12-26 07:35:42 +00:00
|
|
|
return true
|
|
|
|
}
|
2019-02-19 23:47:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// FindVariables returns all variables in the store
|
|
|
|
func (s *Service) FindVariables(ctx context.Context, filter influxdb.VariableFilter, opt ...influxdb.FindOptions) ([]*influxdb.Variable, error) {
|
refactor(kv): delete deprecated kv service code
This includes removal of a lot of kv.Service responsibilities. However,
it does not finish the re-wiring. It removes documents, telegrafs,
notification rules + endpoints, checks, orgs, users, buckets, passwords,
urms, labels and authorizations. There are some oustanding pieces that
are needed to get kv service compiling (dashboard service urm
dependency). Then all the call sites for kv service need updating and
the new implementations of telegraf and notification rules + endpoints
needed installing (along with any necessary migrations).
2020-10-20 13:25:36 +00:00
|
|
|
if filter.Organization != nil {
|
|
|
|
o, err := s.orgs.FindOrganization(ctx, influxdb.OrganizationFilter{
|
|
|
|
Name: filter.Organization,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
filter.OrganizationID = &o.ID
|
|
|
|
}
|
|
|
|
|
2019-02-19 23:47:19 +00:00
|
|
|
res := []*influxdb.Variable{}
|
2019-03-05 00:38:10 +00:00
|
|
|
err := s.kv.View(ctx, func(tx Tx) error {
|
2019-12-31 03:11:53 +00:00
|
|
|
variables, err := s.findVariables(ctx, tx, filter, opt...)
|
2021-03-30 18:10:02 +00:00
|
|
|
if err != nil && errors.ErrorCode(err) != errors.ENotFound {
|
2019-02-19 23:47:19 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
res = variables
|
|
|
|
return nil
|
|
|
|
})
|
2019-12-26 07:35:42 +00:00
|
|
|
return res, err
|
2019-02-19 23:47:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// FindVariableByID finds a single variable in the store by its ID
|
2021-03-30 18:10:02 +00:00
|
|
|
func (s *Service) FindVariableByID(ctx context.Context, id platform.ID) (*influxdb.Variable, error) {
|
2019-02-19 23:47:19 +00:00
|
|
|
var variable *influxdb.Variable
|
2019-03-05 00:38:10 +00:00
|
|
|
err := s.kv.View(ctx, func(tx Tx) error {
|
2019-12-26 07:35:42 +00:00
|
|
|
m, err := s.findVariableByID(ctx, tx, id)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2019-02-19 23:47:19 +00:00
|
|
|
}
|
|
|
|
variable = m
|
|
|
|
return nil
|
|
|
|
})
|
2019-12-26 07:35:42 +00:00
|
|
|
return variable, err
|
2019-02-19 23:47:19 +00:00
|
|
|
}
|
|
|
|
|
2021-03-30 18:10:02 +00:00
|
|
|
func (s *Service) findVariableByID(ctx context.Context, tx Tx, id platform.ID) (*influxdb.Variable, error) {
|
2020-01-01 00:28:55 +00:00
|
|
|
body, err := s.variableStore.FindEnt(ctx, tx, Entity{PK: EncID(id)})
|
2019-02-19 23:47:19 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2019-12-27 02:15:14 +00:00
|
|
|
variable, ok := body.(*influxdb.Variable)
|
2020-01-06 23:31:50 +00:00
|
|
|
return variable, IsErrUnexpectedDecodeVal(ok)
|
2019-02-19 23:47:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// CreateVariable creates a new variable and assigns it an ID
|
2019-12-26 07:35:42 +00:00
|
|
|
func (s *Service) CreateVariable(ctx context.Context, v *influxdb.Variable) error {
|
2019-03-05 00:38:10 +00:00
|
|
|
return s.kv.Update(ctx, func(tx Tx) error {
|
2019-12-26 07:35:42 +00:00
|
|
|
if err := v.Valid(); err != nil {
|
2021-03-30 18:10:02 +00:00
|
|
|
return &errors.Error{
|
|
|
|
Code: errors.EInvalid,
|
2019-11-04 20:28:21 +00:00
|
|
|
Err: err,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-12-26 07:35:42 +00:00
|
|
|
v.ID = s.IDGenerator.ID()
|
2019-06-18 17:06:28 +00:00
|
|
|
now := s.Now()
|
2019-12-26 07:35:42 +00:00
|
|
|
v.CreatedAt = now
|
|
|
|
v.UpdatedAt = now
|
2020-01-06 23:31:50 +00:00
|
|
|
return s.putVariable(ctx, tx, v, PutNew())
|
2019-02-19 23:47:19 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2020-09-09 19:50:02 +00:00
|
|
|
// ReplaceVariable replaces a variable that exists in the store or creates it if it does not
|
2019-12-26 07:35:42 +00:00
|
|
|
func (s *Service) ReplaceVariable(ctx context.Context, v *influxdb.Variable) error {
|
2019-03-05 00:38:10 +00:00
|
|
|
return s.kv.Update(ctx, func(tx Tx) error {
|
2020-09-09 19:50:02 +00:00
|
|
|
if found, _ := s.findVariableByID(ctx, tx, v.ID); found != nil {
|
|
|
|
return s.putVariable(ctx, tx, v, PutUpdate())
|
|
|
|
}
|
2020-01-06 23:31:50 +00:00
|
|
|
return s.putVariable(ctx, tx, v, PutNew())
|
2019-02-19 23:47:19 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2020-01-06 23:31:50 +00:00
|
|
|
func (s *Service) putVariable(ctx context.Context, tx Tx, v *influxdb.Variable, putOpts ...PutOptionFn) error {
|
2019-12-31 18:28:26 +00:00
|
|
|
if err := s.putVariableOrgsIndex(tx, v); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2019-12-26 07:35:42 +00:00
|
|
|
ent := Entity{
|
2020-01-01 00:28:55 +00:00
|
|
|
PK: EncID(v.ID),
|
|
|
|
UniqueKey: Encode(EncID(v.OrganizationID), EncStringCaseInsensitive(v.Name)),
|
|
|
|
Body: v,
|
2019-02-19 23:47:19 +00:00
|
|
|
}
|
2020-01-06 23:31:50 +00:00
|
|
|
return s.variableStore.Put(ctx, tx, ent, putOpts...)
|
2019-02-19 23:47:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// UpdateVariable updates a single variable in the store with a changeset
|
2021-03-30 18:10:02 +00:00
|
|
|
func (s *Service) UpdateVariable(ctx context.Context, id platform.ID, update *influxdb.VariableUpdate) (*influxdb.Variable, error) {
|
2019-12-26 07:35:42 +00:00
|
|
|
var v *influxdb.Variable
|
2019-03-05 00:38:10 +00:00
|
|
|
err := s.kv.Update(ctx, func(tx Tx) error {
|
2019-11-04 20:28:21 +00:00
|
|
|
m, err := s.findVariableByID(ctx, tx, id)
|
|
|
|
if err != nil {
|
2019-12-26 07:35:42 +00:00
|
|
|
return err
|
2019-02-19 23:47:19 +00:00
|
|
|
}
|
2019-06-18 22:05:40 +00:00
|
|
|
m.UpdatedAt = s.Now()
|
2019-12-26 07:35:42 +00:00
|
|
|
v = m
|
2019-11-04 20:28:21 +00:00
|
|
|
|
2020-01-06 23:31:50 +00:00
|
|
|
// TODO: should be moved to service layer
|
2020-01-07 17:09:16 +00:00
|
|
|
update.Name = strings.TrimSpace(update.Name)
|
2019-12-26 07:35:42 +00:00
|
|
|
update.Apply(m)
|
|
|
|
|
2020-01-06 23:31:50 +00:00
|
|
|
return s.putVariable(ctx, tx, v, PutUpdate())
|
2019-02-19 23:47:19 +00:00
|
|
|
})
|
|
|
|
|
2019-12-26 07:35:42 +00:00
|
|
|
return v, err
|
2019-02-19 23:47:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// DeleteVariable removes a single variable from the store by its ID
|
2021-03-30 18:10:02 +00:00
|
|
|
func (s *Service) DeleteVariable(ctx context.Context, id platform.ID) error {
|
2019-03-05 00:38:10 +00:00
|
|
|
return s.kv.Update(ctx, func(tx Tx) error {
|
2019-12-31 18:28:26 +00:00
|
|
|
v, err := s.findVariableByID(ctx, tx, id)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := s.removeVariableOrgsIndex(tx, v); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-01-01 00:28:55 +00:00
|
|
|
return s.variableStore.DeleteEnt(ctx, tx, Entity{PK: EncID(id)})
|
2019-02-19 23:47:19 +00:00
|
|
|
})
|
|
|
|
}
|
2019-12-31 18:28:26 +00:00
|
|
|
|
|
|
|
func encodeVariableOrgsIndex(variable *influxdb.Variable) ([]byte, error) {
|
|
|
|
oID, err := variable.OrganizationID.Encode()
|
|
|
|
if err != nil {
|
2021-03-30 18:10:02 +00:00
|
|
|
return nil, &errors.Error{
|
2019-12-31 18:28:26 +00:00
|
|
|
Err: err,
|
|
|
|
Msg: "bad organization id",
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
mID, err := variable.ID.Encode()
|
|
|
|
if err != nil {
|
2021-03-30 18:10:02 +00:00
|
|
|
return nil, &errors.Error{
|
2019-12-31 18:28:26 +00:00
|
|
|
Err: err,
|
|
|
|
Msg: "bad variable id",
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-30 18:10:02 +00:00
|
|
|
key := make([]byte, 0, platform.IDLength*2)
|
2019-12-31 18:28:26 +00:00
|
|
|
key = append(key, oID...)
|
|
|
|
key = append(key, mID...)
|
|
|
|
|
|
|
|
return key, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Service) putVariableOrgsIndex(tx Tx, variable *influxdb.Variable) error {
|
|
|
|
key, err := encodeVariableOrgsIndex(variable)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
idx, err := tx.Bucket(variableOrgsIndex)
|
|
|
|
if err != nil {
|
2021-03-30 18:10:02 +00:00
|
|
|
return &errors.Error{Code: errors.EInternal, Err: err}
|
2019-12-31 18:28:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if err := idx.Put(key, nil); err != nil {
|
2021-03-30 18:10:02 +00:00
|
|
|
return &errors.Error{Code: errors.EInternal, Err: err}
|
2019-12-31 18:28:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Service) removeVariableOrgsIndex(tx Tx, variable *influxdb.Variable) error {
|
|
|
|
key, err := encodeVariableOrgsIndex(variable)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
idx, err := tx.Bucket(variableOrgsIndex)
|
|
|
|
if err != nil {
|
2021-03-30 18:10:02 +00:00
|
|
|
return &errors.Error{Code: errors.EInternal, Err: err}
|
2019-12-31 18:28:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if err := idx.Delete(key); err != nil {
|
2021-03-30 18:10:02 +00:00
|
|
|
return &errors.Error{Code: errors.EInternal, Err: err}
|
2019-12-31 18:28:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|