feat(pkger): add CRUD store implmentation

pull/17373/head
Johnny Steenbergen 2020-03-19 19:08:58 -07:00 committed by Johnny Steenbergen
parent 583e512451
commit e116ecaf5e
5 changed files with 446 additions and 3 deletions

View File

@ -6,6 +6,7 @@ import (
"errors"
"io"
"io/ioutil"
"net/url"
"os"
"path"
"path/filepath"
@ -591,6 +592,10 @@ type fakePkgSVC struct {
applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, error)
}
func (f *fakePkgSVC) InitStack(ctx context.Context, orgID, userID influxdb.ID, urls ...url.URL) (pkger.Stack, error) {
panic("not implemented")
}
func (f *fakePkgSVC) CreatePkg(ctx context.Context, setters ...pkger.CreatePkgSetFn) (*pkger.Pkg, error) {
if f.createFn != nil {
return f.createFn(ctx, setters...)

View File

@ -926,7 +926,7 @@ spec:
}
assert.Equal(t, expectedMissingEnvs, sum.MissingEnvs)
sum, err = svc.Apply(timedCtx(time.Second), l.Org.ID, l.User.ID, pkg, pkger.ApplyWithEnvRefs(map[string]string{
sum, err = svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, pkg, pkger.ApplyWithEnvRefs(map[string]string{
"bkt-1-name-ref": "rucket_threeve",
"check-1-name-ref": "check_threeve",
"dash-1-name-ref": "dash_threeve",

View File

@ -19,14 +19,20 @@ import (
const APIVersion = "influxdata.com/v2alpha1"
type (
// Stack is an identifier for stateful application of a package(s). This stack
// will map created resources from the pkg(s) to existing resources on the
// platform. This stack is updated only after side effects of applying a pkg.
// If the pkg is applied, and no changes are had, then the stack is not updated.
Stack struct {
ID influxdb.ID
CreatedAt time.Time
UpdatedAt time.Time
URLS []url.URL
Resources []StackResource
influxdb.CRUDLog
}
// StackResource is a record for an individual resource side effect genereated from
// applying a pkg.
StackResource struct {
APIVersion string
ID influxdb.ID
@ -143,6 +149,7 @@ func WithVariableSVC(varSVC influxdb.VariableService) ServiceSetterFn {
}
}
// Store is the storage behavior the Service depends on.
type Store interface {
CreateStack(ctx context.Context, orgID influxdb.ID, stack Stack) error
ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error)
@ -203,9 +210,25 @@ func NewService(opts ...ServiceSetterFn) *Service {
}
}
// InitStack will create a new stack for the given user and its given org. The stack can be created
// with urls that point to the location of packages that are included as part of the stack when
// it is applied.
func (s *Service) InitStack(ctx context.Context, orgID, userID influxdb.ID, urls ...url.URL) (Stack, error) {
now := s.timeGen.Now()
stack := Stack{
ID: s.idGen.ID(),
CRUDLog: influxdb.CRUDLog{
CreatedAt: now,
UpdatedAt: now,
},
URLS: urls,
}
if err := s.store.CreateStack(ctx, orgID, stack); err != nil {
return Stack{}, err
}
return stack, nil
}

226
pkger/store.go Normal file
View File

@ -0,0 +1,226 @@
package pkger
import (
"context"
"encoding/json"
"net/url"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kv"
)
type (
entStack struct {
ID []byte `json:"id"`
OrgID []byte `json:"orgID"`
URLs []string `json:"urls,omitempty"`
Resources []entStackResource `json:"resources,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
entStackResource struct {
APIVersion string `json:"apiVersion"`
ID string `json:"id"`
Kind string `json:"kind"`
Name string `json:"name"`
}
)
// StoreKV is a store implementation that uses a kv store backing.
type StoreKV struct {
kvStore kv.Store
indexBase *kv.IndexStore
}
var _ Store = (*StoreKV)(nil)
// NewStoreKV creates a new StoreKV entity. This does not initialize the store. You will
// want to init it if you want to have this init donezo at startup. If not it'll lazy
// load the buckets as they are used.
func NewStoreKV(store kv.Store) *StoreKV {
const resource = "pkg stack"
storeKV := &StoreKV{
kvStore: store,
}
storeKV.indexBase = &kv.IndexStore{
Resource: resource,
EntStore: storeKV.entStoreBase(resource),
IndexStore: storeKV.indexStoreBase(resource),
}
return storeKV
}
// Init will initialize the all required buckets for the kv store. If not called, will be
// called implicitly on first read/write operation.
func (s *StoreKV) Init(ctx context.Context) error {
return s.kvStore.Update(ctx, func(tx kv.Tx) error {
return s.indexBase.Init(ctx, tx)
})
}
// CreateStack will create a new stack. If collisions are found will fail.
func (s *StoreKV) CreateStack(ctx context.Context, orgID influxdb.ID, stack Stack) error {
return s.put(ctx, orgID, stack, kv.PutNew())
}
// ReadStackByID reads a stack by the provided ID.
func (s *StoreKV) ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error) {
var stack Stack
err := s.kvStore.View(ctx, func(tx kv.Tx) error {
decodedEnt, err := s.indexBase.FindEnt(ctx, tx, kv.Entity{PK: kv.EncID(id)})
if err != nil {
return err
}
stack, err = convertStackEntToStack(decodedEnt.(*entStack))
return err
})
return stack, err
}
// UpdateStack updates a stack.
func (s *StoreKV) UpdateStack(ctx context.Context, orgID influxdb.ID, stack Stack) error {
return s.put(ctx, orgID, stack, kv.PutUpdate())
}
// DeleteStack delets a stack by id.
func (s *StoreKV) DeleteStack(ctx context.Context, id influxdb.ID) error {
return s.kvStore.Update(ctx, func(tx kv.Tx) error {
return s.indexBase.DeleteEnt(ctx, tx, kv.Entity{PK: kv.EncID(id)})
})
}
func (s *StoreKV) put(ctx context.Context, orgID influxdb.ID, stack Stack, opts ...kv.PutOptionFn) error {
ent, err := convertStackToEnt(orgID, stack)
if err != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Err: err,
}
}
return s.kvStore.Update(ctx, func(tx kv.Tx) error {
return s.indexBase.Put(ctx, tx, ent, opts...)
})
}
func (s *StoreKV) entStoreBase(resource string) *kv.StoreBase {
var decodeEntFn kv.DecodeBucketValFn = func(key, val []byte) (keyRepeat []byte, decodedVal interface{}, err error) {
var stack entStack
return key, &stack, json.Unmarshal(val, &stack)
}
var decValToEntFn kv.ConvertValToEntFn = func(k []byte, i interface{}) (kv.Entity, error) {
s, ok := i.(*entStack)
if err := kv.IsErrUnexpectedDecodeVal(ok); err != nil {
return kv.Entity{}, err
}
return kv.Entity{
PK: kv.EncBytes(s.ID),
UniqueKey: kv.Encode(kv.EncBytes(s.OrgID), kv.EncBytes(s.ID)),
Body: s,
}, nil
}
entityBucket := []byte("v1_pkger_stacks")
return kv.NewStoreBase(resource, entityBucket, kv.EncIDKey, kv.EncBodyJSON, decodeEntFn, decValToEntFn)
}
func (s *StoreKV) indexStoreBase(resource string) *kv.StoreBase {
var decValToEntFn kv.ConvertValToEntFn = func(k []byte, v interface{}) (kv.Entity, error) {
id, ok := v.(influxdb.ID)
if err := kv.IsErrUnexpectedDecodeVal(ok); err != nil {
return kv.Entity{}, err
}
return kv.Entity{
PK: kv.EncID(id),
UniqueKey: kv.EncBytes(k),
}, nil
}
indexBucket := []byte("v1_pkger_stacks_index")
return kv.NewStoreBase(resource, indexBucket, kv.EncUniqKey, kv.EncIDKey, kv.DecIndexID, decValToEntFn)
}
func convertStackToEnt(orgID influxdb.ID, stack Stack) (kv.Entity, error) {
idBytes, err := stack.ID.Encode()
if err != nil {
return kv.Entity{}, err
}
orgIDBytes, err := orgID.Encode()
if err != nil {
return kv.Entity{}, err
}
urlStrs := make([]string, 0, len(stack.URLS))
for _, u := range stack.URLS {
urlStrs = append(urlStrs, u.String())
}
stEnt := entStack{
ID: idBytes,
OrgID: orgIDBytes,
CreatedAt: stack.CreatedAt,
UpdatedAt: stack.UpdatedAt,
URLs: urlStrs,
}
for _, res := range stack.Resources {
stEnt.Resources = append(stEnt.Resources, entStackResource{
APIVersion: res.APIVersion,
ID: res.ID.String(),
Kind: res.Kind.String(),
Name: res.Name,
})
}
return kv.Entity{
PK: kv.EncBytes(stEnt.ID),
UniqueKey: kv.Encode(kv.EncBytes(stEnt.OrgID), kv.EncBytes(stEnt.ID)),
Body: stEnt,
}, nil
}
func convertStackEntToStack(ent *entStack) (Stack, error) {
stack := Stack{
CRUDLog: influxdb.CRUDLog{
CreatedAt: ent.CreatedAt,
UpdatedAt: ent.UpdatedAt,
},
}
if err := stack.ID.Decode(ent.ID); err != nil {
return Stack{}, err
}
for _, urlStr := range ent.URLs {
u, err := url.Parse(urlStr)
if err != nil {
return Stack{}, err
}
stack.URLS = append(stack.URLS, *u)
}
for _, res := range ent.Resources {
stackRes := StackResource{
APIVersion: res.APIVersion,
Kind: Kind(res.Kind),
Name: res.Name,
}
if err := stackRes.ID.DecodeFromString(res.ID); err != nil {
return Stack{}, nil
}
stack.Resources = append(stack.Resources, stackRes)
}
return stack, nil
}

189
pkger/store_test.go Normal file
View File

@ -0,0 +1,189 @@
package pkger_test
import (
"context"
"net/url"
"testing"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/inmem"
"github.com/influxdata/influxdb/pkger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestStoreKv(t *testing.T) {
inMemStore := inmem.NewKVStore()
stackStub := func(id influxdb.ID) pkger.Stack {
now := time.Time{}.Add(10 * 365 * 24 * time.Hour)
return pkger.Stack{
ID: id,
CRUDLog: influxdb.CRUDLog{
CreatedAt: now,
UpdatedAt: now.Add(time.Hour),
},
URLS: []url.URL{
newURL(t, "http://example.com"),
newURL(t, "http://abc.gov"),
},
Resources: []pkger.StackResource{
{
APIVersion: pkger.APIVersion,
ID: 9000,
Kind: pkger.KindBucket,
Name: "buzz lightyear",
},
{
APIVersion: pkger.APIVersion,
ID: 333,
Kind: pkger.KindBucket,
Name: "beyond",
},
},
}
}
t.Run("create a stack", func(t *testing.T) {
defer inMemStore.Flush(context.Background())
storeKV := pkger.NewStoreKV(inMemStore)
orgID := influxdb.ID(3)
seedEntities(t, storeKV, orgID, pkger.Stack{
ID: 1,
})
t.Run("with no ID collisions creates successfully", func(t *testing.T) {
expected := stackStub(3)
err := storeKV.CreateStack(context.Background(), orgID, expected)
require.NoError(t, err)
readStackEqual(t, storeKV, expected)
})
t.Run("with ID collisions fails with conflict error", func(t *testing.T) {
for _, id := range []influxdb.ID{2, 3} {
err := storeKV.CreateStack(context.Background(), orgID, pkger.Stack{ID: 1})
require.Errorf(t, err, "id=%d", id)
assert.Equalf(t, influxdb.EConflict, influxdb.ErrorCode(err), "id=%d", id)
}
})
})
t.Run("read a stack", func(t *testing.T) {
defer inMemStore.Flush(context.Background())
storeKV := pkger.NewStoreKV(inMemStore)
orgID := influxdb.ID(3)
expected := stackStub(1)
seedEntities(t, storeKV, orgID, expected)
t.Run("with valid ID returns stack successfully", func(t *testing.T) {
readStackEqual(t, storeKV, expected)
})
t.Run("when no match found fails with not found error", func(t *testing.T) {
unmatchedID := influxdb.ID(3000)
_, err := storeKV.ReadStackByID(context.Background(), unmatchedID)
require.Error(t, err)
assert.Equal(t, influxdb.ENotFound, influxdb.ErrorCode(err))
})
})
t.Run("update a stack", func(t *testing.T) {
defer inMemStore.Flush(context.Background())
storeKV := pkger.NewStoreKV(inMemStore)
orgID := influxdb.ID(3)
expected := stackStub(1)
seedEntities(t, storeKV, orgID, expected)
t.Run("with valid ID updates stack successfully", func(t *testing.T) {
updateStack := expected
updateStack.Resources = append(updateStack.Resources, pkger.StackResource{
APIVersion: pkger.APIVersion,
ID: 333,
Kind: pkger.KindBucket,
Name: "beyond",
})
err := storeKV.UpdateStack(context.Background(), orgID, updateStack)
require.NoError(t, err)
readStackEqual(t, storeKV, updateStack)
})
t.Run("when no match found fails with not found error", func(t *testing.T) {
unmatchedID := influxdb.ID(3000)
err := storeKV.UpdateStack(context.Background(), orgID, pkger.Stack{ID: unmatchedID})
require.Error(t, err)
assert.Equalf(t, influxdb.ENotFound, influxdb.ErrorCode(err), "err: %s", err)
})
})
t.Run("delete a stack", func(t *testing.T) {
defer inMemStore.Flush(context.Background())
storeKV := pkger.NewStoreKV(inMemStore)
orgID := influxdb.ID(3)
expected := stackStub(1)
seedEntities(t, storeKV, orgID, expected)
t.Run("with valid ID deletes stack successfully", func(t *testing.T) {
err := storeKV.DeleteStack(context.Background(), expected.ID)
require.NoError(t, err)
_, err = storeKV.ReadStackByID(context.Background(), expected.ID)
require.Error(t, err)
errCodeEqual(t, influxdb.ENotFound, err)
})
t.Run("when no match found fails with not found error", func(t *testing.T) {
unmatchedID := influxdb.ID(3000)
err := storeKV.DeleteStack(context.Background(), unmatchedID)
require.Error(t, err)
errCodeEqual(t, influxdb.ENotFound, err)
})
})
}
func readStackEqual(t *testing.T, store pkger.Store, expected pkger.Stack) {
t.Helper()
stack, err := store.ReadStackByID(context.Background(), expected.ID)
require.NoError(t, err)
assert.Equal(t, expected, stack)
}
func errCodeEqual(t *testing.T, expected string, actual error) {
t.Helper()
assert.Equalf(t, expected, influxdb.ErrorCode(actual), "err: %s", actual)
}
func seedEntities(t *testing.T, store pkger.Store, orgID influxdb.ID, first pkger.Stack, rest ...pkger.Stack) {
t.Helper()
for _, st := range append(rest, first) {
err := store.CreateStack(context.Background(), orgID, st)
require.NoError(t, err)
}
}
func newURL(t *testing.T, rawurl string) url.URL {
t.Helper()
u, err := url.Parse(rawurl)
require.NoError(t, err)
return *u
}