diff --git a/cmd/influx/pkg_test.go b/cmd/influx/pkg_test.go index d088f0cea9..a6aaea98eb 100644 --- a/cmd/influx/pkg_test.go +++ b/cmd/influx/pkg_test.go @@ -6,6 +6,7 @@ import ( "errors" "io" "io/ioutil" + "net/url" "os" "path" "path/filepath" @@ -591,6 +592,10 @@ type fakePkgSVC struct { applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, error) } +func (f *fakePkgSVC) InitStack(ctx context.Context, orgID, userID influxdb.ID, urls ...url.URL) (pkger.Stack, error) { + panic("not implemented") +} + func (f *fakePkgSVC) CreatePkg(ctx context.Context, setters ...pkger.CreatePkgSetFn) (*pkger.Pkg, error) { if f.createFn != nil { return f.createFn(ctx, setters...) diff --git a/cmd/influxd/launcher/pkger_test.go b/cmd/influxd/launcher/pkger_test.go index 80e2784be8..026a82dbd3 100644 --- a/cmd/influxd/launcher/pkger_test.go +++ b/cmd/influxd/launcher/pkger_test.go @@ -926,7 +926,7 @@ spec: } assert.Equal(t, expectedMissingEnvs, sum.MissingEnvs) - sum, err = svc.Apply(timedCtx(time.Second), l.Org.ID, l.User.ID, pkg, pkger.ApplyWithEnvRefs(map[string]string{ + sum, err = svc.Apply(timedCtx(5*time.Second), l.Org.ID, l.User.ID, pkg, pkger.ApplyWithEnvRefs(map[string]string{ "bkt-1-name-ref": "rucket_threeve", "check-1-name-ref": "check_threeve", "dash-1-name-ref": "dash_threeve", diff --git a/pkger/service.go b/pkger/service.go index e36c18178c..33482bae7f 100644 --- a/pkger/service.go +++ b/pkger/service.go @@ -19,14 +19,20 @@ import ( const APIVersion = "influxdata.com/v2alpha1" type ( + // Stack is an identifier for stateful application of a package(s). This stack + // will map created resources from the pkg(s) to existing resources on the + // platform. This stack is updated only after side effects of applying a pkg. + // If the pkg is applied, and no changes are had, then the stack is not updated. Stack struct { ID influxdb.ID - CreatedAt time.Time - UpdatedAt time.Time URLS []url.URL Resources []StackResource + + influxdb.CRUDLog } + // StackResource is a record for an individual resource side effect genereated from + // applying a pkg. StackResource struct { APIVersion string ID influxdb.ID @@ -143,6 +149,7 @@ func WithVariableSVC(varSVC influxdb.VariableService) ServiceSetterFn { } } +// Store is the storage behavior the Service depends on. type Store interface { CreateStack(ctx context.Context, orgID influxdb.ID, stack Stack) error ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error) @@ -203,9 +210,25 @@ func NewService(opts ...ServiceSetterFn) *Service { } } +// InitStack will create a new stack for the given user and its given org. The stack can be created +// with urls that point to the location of packages that are included as part of the stack when +// it is applied. func (s *Service) InitStack(ctx context.Context, orgID, userID influxdb.ID, urls ...url.URL) (Stack, error) { + now := s.timeGen.Now() + stack := Stack{ + ID: s.idGen.ID(), + CRUDLog: influxdb.CRUDLog{ + CreatedAt: now, + UpdatedAt: now, + }, + URLS: urls, } + + if err := s.store.CreateStack(ctx, orgID, stack); err != nil { + return Stack{}, err + } + return stack, nil } diff --git a/pkger/store.go b/pkger/store.go new file mode 100644 index 0000000000..cb96f4e3af --- /dev/null +++ b/pkger/store.go @@ -0,0 +1,226 @@ +package pkger + +import ( + "context" + "encoding/json" + "net/url" + "time" + + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/kv" +) + +type ( + entStack struct { + ID []byte `json:"id"` + OrgID []byte `json:"orgID"` + + URLs []string `json:"urls,omitempty"` + Resources []entStackResource `json:"resources,omitempty"` + + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` + } + + entStackResource struct { + APIVersion string `json:"apiVersion"` + ID string `json:"id"` + Kind string `json:"kind"` + Name string `json:"name"` + } +) + +// StoreKV is a store implementation that uses a kv store backing. +type StoreKV struct { + kvStore kv.Store + indexBase *kv.IndexStore +} + +var _ Store = (*StoreKV)(nil) + +// NewStoreKV creates a new StoreKV entity. This does not initialize the store. You will +// want to init it if you want to have this init donezo at startup. If not it'll lazy +// load the buckets as they are used. +func NewStoreKV(store kv.Store) *StoreKV { + const resource = "pkg stack" + + storeKV := &StoreKV{ + kvStore: store, + } + storeKV.indexBase = &kv.IndexStore{ + Resource: resource, + EntStore: storeKV.entStoreBase(resource), + IndexStore: storeKV.indexStoreBase(resource), + } + return storeKV +} + +// Init will initialize the all required buckets for the kv store. If not called, will be +// called implicitly on first read/write operation. +func (s *StoreKV) Init(ctx context.Context) error { + return s.kvStore.Update(ctx, func(tx kv.Tx) error { + return s.indexBase.Init(ctx, tx) + }) +} + +// CreateStack will create a new stack. If collisions are found will fail. +func (s *StoreKV) CreateStack(ctx context.Context, orgID influxdb.ID, stack Stack) error { + return s.put(ctx, orgID, stack, kv.PutNew()) +} + +// ReadStackByID reads a stack by the provided ID. +func (s *StoreKV) ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error) { + var stack Stack + err := s.kvStore.View(ctx, func(tx kv.Tx) error { + decodedEnt, err := s.indexBase.FindEnt(ctx, tx, kv.Entity{PK: kv.EncID(id)}) + if err != nil { + return err + } + stack, err = convertStackEntToStack(decodedEnt.(*entStack)) + return err + }) + return stack, err +} + +// UpdateStack updates a stack. +func (s *StoreKV) UpdateStack(ctx context.Context, orgID influxdb.ID, stack Stack) error { + return s.put(ctx, orgID, stack, kv.PutUpdate()) +} + +// DeleteStack delets a stack by id. +func (s *StoreKV) DeleteStack(ctx context.Context, id influxdb.ID) error { + return s.kvStore.Update(ctx, func(tx kv.Tx) error { + return s.indexBase.DeleteEnt(ctx, tx, kv.Entity{PK: kv.EncID(id)}) + }) +} + +func (s *StoreKV) put(ctx context.Context, orgID influxdb.ID, stack Stack, opts ...kv.PutOptionFn) error { + ent, err := convertStackToEnt(orgID, stack) + if err != nil { + return &influxdb.Error{ + Code: influxdb.EInvalid, + Err: err, + } + } + + return s.kvStore.Update(ctx, func(tx kv.Tx) error { + return s.indexBase.Put(ctx, tx, ent, opts...) + }) +} + +func (s *StoreKV) entStoreBase(resource string) *kv.StoreBase { + var decodeEntFn kv.DecodeBucketValFn = func(key, val []byte) (keyRepeat []byte, decodedVal interface{}, err error) { + var stack entStack + return key, &stack, json.Unmarshal(val, &stack) + } + + var decValToEntFn kv.ConvertValToEntFn = func(k []byte, i interface{}) (kv.Entity, error) { + s, ok := i.(*entStack) + if err := kv.IsErrUnexpectedDecodeVal(ok); err != nil { + return kv.Entity{}, err + } + + return kv.Entity{ + PK: kv.EncBytes(s.ID), + UniqueKey: kv.Encode(kv.EncBytes(s.OrgID), kv.EncBytes(s.ID)), + Body: s, + }, nil + } + + entityBucket := []byte("v1_pkger_stacks") + + return kv.NewStoreBase(resource, entityBucket, kv.EncIDKey, kv.EncBodyJSON, decodeEntFn, decValToEntFn) +} + +func (s *StoreKV) indexStoreBase(resource string) *kv.StoreBase { + var decValToEntFn kv.ConvertValToEntFn = func(k []byte, v interface{}) (kv.Entity, error) { + id, ok := v.(influxdb.ID) + if err := kv.IsErrUnexpectedDecodeVal(ok); err != nil { + return kv.Entity{}, err + } + + return kv.Entity{ + PK: kv.EncID(id), + UniqueKey: kv.EncBytes(k), + }, nil + } + + indexBucket := []byte("v1_pkger_stacks_index") + + return kv.NewStoreBase(resource, indexBucket, kv.EncUniqKey, kv.EncIDKey, kv.DecIndexID, decValToEntFn) +} + +func convertStackToEnt(orgID influxdb.ID, stack Stack) (kv.Entity, error) { + idBytes, err := stack.ID.Encode() + if err != nil { + return kv.Entity{}, err + } + + orgIDBytes, err := orgID.Encode() + if err != nil { + return kv.Entity{}, err + } + + urlStrs := make([]string, 0, len(stack.URLS)) + for _, u := range stack.URLS { + urlStrs = append(urlStrs, u.String()) + } + + stEnt := entStack{ + ID: idBytes, + OrgID: orgIDBytes, + CreatedAt: stack.CreatedAt, + UpdatedAt: stack.UpdatedAt, + URLs: urlStrs, + } + + for _, res := range stack.Resources { + stEnt.Resources = append(stEnt.Resources, entStackResource{ + APIVersion: res.APIVersion, + ID: res.ID.String(), + Kind: res.Kind.String(), + Name: res.Name, + }) + } + + return kv.Entity{ + PK: kv.EncBytes(stEnt.ID), + UniqueKey: kv.Encode(kv.EncBytes(stEnt.OrgID), kv.EncBytes(stEnt.ID)), + Body: stEnt, + }, nil +} + +func convertStackEntToStack(ent *entStack) (Stack, error) { + stack := Stack{ + CRUDLog: influxdb.CRUDLog{ + CreatedAt: ent.CreatedAt, + UpdatedAt: ent.UpdatedAt, + }, + } + if err := stack.ID.Decode(ent.ID); err != nil { + return Stack{}, err + } + + for _, urlStr := range ent.URLs { + u, err := url.Parse(urlStr) + if err != nil { + return Stack{}, err + } + stack.URLS = append(stack.URLS, *u) + } + + for _, res := range ent.Resources { + stackRes := StackResource{ + APIVersion: res.APIVersion, + Kind: Kind(res.Kind), + Name: res.Name, + } + if err := stackRes.ID.DecodeFromString(res.ID); err != nil { + return Stack{}, nil + } + + stack.Resources = append(stack.Resources, stackRes) + } + + return stack, nil +} diff --git a/pkger/store_test.go b/pkger/store_test.go new file mode 100644 index 0000000000..569e4caa62 --- /dev/null +++ b/pkger/store_test.go @@ -0,0 +1,189 @@ +package pkger_test + +import ( + "context" + "net/url" + "testing" + "time" + + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/inmem" + "github.com/influxdata/influxdb/pkger" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestStoreKv(t *testing.T) { + inMemStore := inmem.NewKVStore() + + stackStub := func(id influxdb.ID) pkger.Stack { + now := time.Time{}.Add(10 * 365 * 24 * time.Hour) + return pkger.Stack{ + ID: id, + CRUDLog: influxdb.CRUDLog{ + CreatedAt: now, + UpdatedAt: now.Add(time.Hour), + }, + URLS: []url.URL{ + newURL(t, "http://example.com"), + newURL(t, "http://abc.gov"), + }, + Resources: []pkger.StackResource{ + { + APIVersion: pkger.APIVersion, + ID: 9000, + Kind: pkger.KindBucket, + Name: "buzz lightyear", + }, + { + APIVersion: pkger.APIVersion, + ID: 333, + Kind: pkger.KindBucket, + Name: "beyond", + }, + }, + } + } + + t.Run("create a stack", func(t *testing.T) { + defer inMemStore.Flush(context.Background()) + + storeKV := pkger.NewStoreKV(inMemStore) + orgID := influxdb.ID(3) + + seedEntities(t, storeKV, orgID, pkger.Stack{ + ID: 1, + }) + + t.Run("with no ID collisions creates successfully", func(t *testing.T) { + expected := stackStub(3) + + err := storeKV.CreateStack(context.Background(), orgID, expected) + require.NoError(t, err) + + readStackEqual(t, storeKV, expected) + }) + + t.Run("with ID collisions fails with conflict error", func(t *testing.T) { + for _, id := range []influxdb.ID{2, 3} { + err := storeKV.CreateStack(context.Background(), orgID, pkger.Stack{ID: 1}) + require.Errorf(t, err, "id=%d", id) + assert.Equalf(t, influxdb.EConflict, influxdb.ErrorCode(err), "id=%d", id) + } + }) + }) + + t.Run("read a stack", func(t *testing.T) { + defer inMemStore.Flush(context.Background()) + + storeKV := pkger.NewStoreKV(inMemStore) + orgID := influxdb.ID(3) + + expected := stackStub(1) + + seedEntities(t, storeKV, orgID, expected) + + t.Run("with valid ID returns stack successfully", func(t *testing.T) { + readStackEqual(t, storeKV, expected) + }) + + t.Run("when no match found fails with not found error", func(t *testing.T) { + unmatchedID := influxdb.ID(3000) + _, err := storeKV.ReadStackByID(context.Background(), unmatchedID) + require.Error(t, err) + assert.Equal(t, influxdb.ENotFound, influxdb.ErrorCode(err)) + }) + }) + + t.Run("update a stack", func(t *testing.T) { + defer inMemStore.Flush(context.Background()) + + storeKV := pkger.NewStoreKV(inMemStore) + orgID := influxdb.ID(3) + + expected := stackStub(1) + + seedEntities(t, storeKV, orgID, expected) + + t.Run("with valid ID updates stack successfully", func(t *testing.T) { + updateStack := expected + updateStack.Resources = append(updateStack.Resources, pkger.StackResource{ + APIVersion: pkger.APIVersion, + ID: 333, + Kind: pkger.KindBucket, + Name: "beyond", + }) + + err := storeKV.UpdateStack(context.Background(), orgID, updateStack) + require.NoError(t, err) + + readStackEqual(t, storeKV, updateStack) + }) + + t.Run("when no match found fails with not found error", func(t *testing.T) { + unmatchedID := influxdb.ID(3000) + err := storeKV.UpdateStack(context.Background(), orgID, pkger.Stack{ID: unmatchedID}) + require.Error(t, err) + assert.Equalf(t, influxdb.ENotFound, influxdb.ErrorCode(err), "err: %s", err) + }) + }) + + t.Run("delete a stack", func(t *testing.T) { + defer inMemStore.Flush(context.Background()) + + storeKV := pkger.NewStoreKV(inMemStore) + orgID := influxdb.ID(3) + + expected := stackStub(1) + + seedEntities(t, storeKV, orgID, expected) + + t.Run("with valid ID deletes stack successfully", func(t *testing.T) { + err := storeKV.DeleteStack(context.Background(), expected.ID) + require.NoError(t, err) + + _, err = storeKV.ReadStackByID(context.Background(), expected.ID) + require.Error(t, err) + errCodeEqual(t, influxdb.ENotFound, err) + }) + + t.Run("when no match found fails with not found error", func(t *testing.T) { + unmatchedID := influxdb.ID(3000) + err := storeKV.DeleteStack(context.Background(), unmatchedID) + require.Error(t, err) + errCodeEqual(t, influxdb.ENotFound, err) + }) + }) +} + +func readStackEqual(t *testing.T, store pkger.Store, expected pkger.Stack) { + t.Helper() + + stack, err := store.ReadStackByID(context.Background(), expected.ID) + require.NoError(t, err) + assert.Equal(t, expected, stack) +} + +func errCodeEqual(t *testing.T, expected string, actual error) { + t.Helper() + + assert.Equalf(t, expected, influxdb.ErrorCode(actual), "err: %s", actual) +} + +func seedEntities(t *testing.T, store pkger.Store, orgID influxdb.ID, first pkger.Stack, rest ...pkger.Stack) { + t.Helper() + + for _, st := range append(rest, first) { + err := store.CreateStack(context.Background(), orgID, st) + require.NoError(t, err) + } +} + +func newURL(t *testing.T, rawurl string) url.URL { + t.Helper() + + u, err := url.Parse(rawurl) + require.NoError(t, err) + + return *u +}