236 lines
6.0 KiB
Go
236 lines
6.0 KiB
Go
package pkger
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"github.com/influxdata/influxdb/v2"
|
|
"github.com/influxdata/influxdb/v2/kv"
|
|
)
|
|
|
|
type (
|
|
entStack struct {
|
|
ID []byte `json:"id"`
|
|
OrgID []byte `json:"orgID"`
|
|
|
|
Name string `json:"name"`
|
|
Description string `json:"description"`
|
|
URLs []string `json:"urls,omitempty"`
|
|
Resources []entStackResource `json:"resources,omitempty"`
|
|
|
|
CreatedAt time.Time `json:"createdAt"`
|
|
UpdatedAt time.Time `json:"updatedAt"`
|
|
}
|
|
|
|
entStackResource struct {
|
|
APIVersion string `json:"apiVersion"`
|
|
ID string `json:"id"`
|
|
Kind string `json:"kind"`
|
|
Name string `json:"name"`
|
|
}
|
|
)
|
|
|
|
// StoreKV is a store implementation that uses a kv store backing.
|
|
type StoreKV struct {
|
|
kvStore kv.Store
|
|
indexBase *kv.IndexStore
|
|
}
|
|
|
|
var _ Store = (*StoreKV)(nil)
|
|
|
|
// NewStoreKV creates a new StoreKV entity. This does not initialize the store. You will
|
|
// want to init it if you want to have this init donezo at startup. If not it'll lazy
|
|
// load the buckets as they are used.
|
|
func NewStoreKV(store kv.Store) *StoreKV {
|
|
const resource = "pkg stack"
|
|
|
|
storeKV := &StoreKV{
|
|
kvStore: store,
|
|
}
|
|
storeKV.indexBase = &kv.IndexStore{
|
|
Resource: resource,
|
|
EntStore: storeKV.entStoreBase(resource),
|
|
IndexStore: storeKV.indexStoreBase(resource),
|
|
}
|
|
return storeKV
|
|
}
|
|
|
|
// Init will initialize the all required buckets for the kv store. If not called, will be
|
|
// called implicitly on first read/write operation.
|
|
func (s *StoreKV) Init(ctx context.Context) error {
|
|
return s.kvStore.Update(ctx, func(tx kv.Tx) error {
|
|
return s.indexBase.Init(ctx, tx)
|
|
})
|
|
}
|
|
|
|
// CreateStack will create a new stack. If collisions are found will fail.
|
|
func (s *StoreKV) CreateStack(ctx context.Context, stack Stack) error {
|
|
return s.put(ctx, stack, kv.PutNew())
|
|
}
|
|
|
|
// ReadStackByID reads a stack by the provided ID.
|
|
func (s *StoreKV) ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error) {
|
|
var stack Stack
|
|
err := s.kvStore.View(ctx, func(tx kv.Tx) error {
|
|
decodedEnt, err := s.indexBase.FindEnt(ctx, tx, kv.Entity{PK: kv.EncID(id)})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
stack, err = convertStackEntToStack(decodedEnt.(*entStack))
|
|
return err
|
|
})
|
|
return stack, err
|
|
}
|
|
|
|
// UpdateStack updates a stack.
|
|
func (s *StoreKV) UpdateStack(ctx context.Context, stack Stack) error {
|
|
existing, err := s.ReadStackByID(ctx, stack.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if stack.OrgID != existing.OrgID {
|
|
return &influxdb.Error{
|
|
Code: influxdb.EUnprocessableEntity,
|
|
Msg: "org id does not match",
|
|
}
|
|
}
|
|
|
|
return s.put(ctx, stack, kv.PutUpdate())
|
|
}
|
|
|
|
// DeleteStack deletes a stack by id.
|
|
func (s *StoreKV) DeleteStack(ctx context.Context, id influxdb.ID) error {
|
|
return s.kvStore.Update(ctx, func(tx kv.Tx) error {
|
|
return s.indexBase.DeleteEnt(ctx, tx, kv.Entity{PK: kv.EncID(id)})
|
|
})
|
|
}
|
|
|
|
func (s *StoreKV) put(ctx context.Context, stack Stack, opts ...kv.PutOptionFn) error {
|
|
ent, err := convertStackToEnt(stack)
|
|
if err != nil {
|
|
return &influxdb.Error{
|
|
Code: influxdb.EInvalid,
|
|
Err: err,
|
|
}
|
|
}
|
|
|
|
return s.kvStore.Update(ctx, func(tx kv.Tx) error {
|
|
return s.indexBase.Put(ctx, tx, ent, opts...)
|
|
})
|
|
}
|
|
|
|
func (s *StoreKV) entStoreBase(resource string) *kv.StoreBase {
|
|
var decodeEntFn kv.DecodeBucketValFn = func(key, val []byte) (keyRepeat []byte, decodedVal interface{}, err error) {
|
|
var stack entStack
|
|
return key, &stack, json.Unmarshal(val, &stack)
|
|
}
|
|
|
|
var decValToEntFn kv.ConvertValToEntFn = func(k []byte, i interface{}) (kv.Entity, error) {
|
|
s, ok := i.(*entStack)
|
|
if err := kv.IsErrUnexpectedDecodeVal(ok); err != nil {
|
|
return kv.Entity{}, err
|
|
}
|
|
|
|
return kv.Entity{
|
|
PK: kv.EncBytes(s.ID),
|
|
UniqueKey: kv.Encode(kv.EncBytes(s.OrgID), kv.EncBytes(s.ID)),
|
|
Body: s,
|
|
}, nil
|
|
}
|
|
|
|
entityBucket := []byte("v1_pkger_stacks")
|
|
|
|
return kv.NewStoreBase(resource, entityBucket, kv.EncIDKey, kv.EncBodyJSON, decodeEntFn, decValToEntFn)
|
|
}
|
|
|
|
func (s *StoreKV) indexStoreBase(resource string) *kv.StoreBase {
|
|
var decValToEntFn kv.ConvertValToEntFn = func(k []byte, v interface{}) (kv.Entity, error) {
|
|
id, ok := v.(influxdb.ID)
|
|
if err := kv.IsErrUnexpectedDecodeVal(ok); err != nil {
|
|
return kv.Entity{}, err
|
|
}
|
|
|
|
return kv.Entity{
|
|
PK: kv.EncID(id),
|
|
UniqueKey: kv.EncBytes(k),
|
|
}, nil
|
|
}
|
|
|
|
indexBucket := []byte("v1_pkger_stacks_index")
|
|
|
|
return kv.NewStoreBase(resource, indexBucket, kv.EncUniqKey, kv.EncIDKey, kv.DecIndexID, decValToEntFn)
|
|
}
|
|
|
|
func convertStackToEnt(stack Stack) (kv.Entity, error) {
|
|
idBytes, err := stack.ID.Encode()
|
|
if err != nil {
|
|
return kv.Entity{}, err
|
|
}
|
|
|
|
orgIDBytes, err := stack.OrgID.Encode()
|
|
if err != nil {
|
|
return kv.Entity{}, err
|
|
}
|
|
|
|
stEnt := entStack{
|
|
ID: idBytes,
|
|
OrgID: orgIDBytes,
|
|
Name: stack.Name,
|
|
Description: stack.Description,
|
|
CreatedAt: stack.CreatedAt,
|
|
UpdatedAt: stack.UpdatedAt,
|
|
URLs: stack.URLs,
|
|
}
|
|
|
|
for _, res := range stack.Resources {
|
|
stEnt.Resources = append(stEnt.Resources, entStackResource{
|
|
APIVersion: res.APIVersion,
|
|
ID: res.ID.String(),
|
|
Kind: res.Kind.String(),
|
|
Name: res.Name,
|
|
})
|
|
}
|
|
|
|
return kv.Entity{
|
|
PK: kv.EncBytes(stEnt.ID),
|
|
UniqueKey: kv.Encode(kv.EncBytes(stEnt.OrgID), kv.EncBytes(stEnt.ID)),
|
|
Body: stEnt,
|
|
}, nil
|
|
}
|
|
|
|
func convertStackEntToStack(ent *entStack) (Stack, error) {
|
|
stack := Stack{
|
|
Name: ent.Name,
|
|
Description: ent.Description,
|
|
URLs: ent.URLs,
|
|
CRUDLog: influxdb.CRUDLog{
|
|
CreatedAt: ent.CreatedAt,
|
|
UpdatedAt: ent.UpdatedAt,
|
|
},
|
|
}
|
|
if err := stack.ID.Decode(ent.ID); err != nil {
|
|
return Stack{}, err
|
|
}
|
|
|
|
if err := stack.OrgID.Decode(ent.OrgID); err != nil {
|
|
return Stack{}, err
|
|
}
|
|
|
|
for _, res := range ent.Resources {
|
|
stackRes := StackResource{
|
|
APIVersion: res.APIVersion,
|
|
Kind: Kind(res.Kind),
|
|
Name: res.Name,
|
|
}
|
|
if err := stackRes.ID.DecodeFromString(res.ID); err != nil {
|
|
return Stack{}, nil
|
|
}
|
|
|
|
stack.Resources = append(stack.Resources, stackRes)
|
|
}
|
|
|
|
return stack, nil
|
|
}
|