feat(pkger): extend store with stack lists operation

references: #17544
pull/17925/head^2
Johnny Steenbergen 2020-04-29 21:50:52 -07:00 committed by Johnny Steenbergen
parent 67307d804e
commit 097d761c3a
8 changed files with 285 additions and 1 deletions

View File

@ -199,6 +199,7 @@ func WithVariableSVC(varSVC influxdb.VariableService) ServiceSetterFn {
// Store is the storage behavior the Service depends on.
type Store interface {
CreateStack(ctx context.Context, stack Stack) error
ListStacks(ctx context.Context, orgID influxdb.ID, filter ListFilter) ([]Stack, error)
ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error)
UpdateStack(ctx context.Context, stack Stack) error
DeleteStack(ctx context.Context, id influxdb.ID) error

View File

@ -36,6 +36,14 @@ func (s *authMW) InitStack(ctx context.Context, userID influxdb.ID, newStack Sta
return s.next.InitStack(ctx, userID, newStack)
}
func (s *authMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
err := s.authAgent.OrgPermissions(ctx, orgID, influxdb.ReadAction)
if err != nil {
return nil, err
}
return s.next.ListStacks(ctx, orgID, f)
}
func (s *authMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error) {
return s.next.CreatePkg(ctx, setters...)
}

View File

@ -34,15 +34,38 @@ func (s *loggingMW) InitStack(ctx context.Context, userID influxdb.ID, newStack
s.logger.Error(
"failed to init stack",
zap.Error(err),
zap.Duration("took", time.Since(start)),
zap.Stringer("orgID", newStack.OrgID),
zap.Stringer("userID", userID),
zap.Strings("urls", newStack.URLs),
zap.Duration("took", time.Since(start)),
)
}(time.Now())
return s.next.InitStack(ctx, userID, newStack)
}
func (s *loggingMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) (stacks []Stack, err error) {
defer func(start time.Time) {
if err == nil {
return
}
var stackIDs []string
for _, id := range f.StackIDs {
stackIDs = append(stackIDs, id.String())
}
s.logger.Error(
"failed to list stacks",
zap.Error(err),
zap.Stringer("orgID", orgID),
zap.Strings("stackIDs", stackIDs),
zap.Strings("names", f.Names),
zap.Duration("took", time.Since(start)),
)
}(time.Now())
return s.next.ListStacks(ctx, orgID, f)
}
func (s *loggingMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (pkg *Pkg, err error) {
defer func(start time.Time) {
dur := zap.Duration("took", time.Since(start))

View File

@ -33,6 +33,12 @@ func (s *mwMetrics) InitStack(ctx context.Context, userID influxdb.ID, newStack
return stack, rec(err)
}
func (s *mwMetrics) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
rec := s.rec.Record("list_stacks")
stacks, err := s.next.ListStacks(ctx, orgID, f)
return stacks, rec(err)
}
func (s *mwMetrics) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error) {
rec := s.rec.Record("create_pkg")
pkg, err := s.next.CreatePkg(ctx, setters...)

View File

@ -3234,6 +3234,10 @@ func (s *fakeStore) CreateStack(ctx context.Context, stack Stack) error {
panic("not implemented")
}
func (s *fakeStore) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
panic("not implemented")
}
func (s *fakeStore) ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error) {
panic("not implemented")
}

View File

@ -3,6 +3,8 @@ package pkger
import (
"context"
"github.com/opentracing/opentracing-go/log"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
)
@ -26,6 +28,18 @@ func (s *traceMW) InitStack(ctx context.Context, userID influxdb.ID, newStack St
return s.next.InitStack(ctx, userID, newStack)
}
func (s *traceMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
span, ctx := tracing.StartSpanFromContextWithOperationName(ctx, "ListStacks")
defer span.Finish()
stacks, err := s.next.ListStacks(ctx, orgID, f)
span.LogFields(
log.String("org_id", orgID.String()),
log.Int("num_stacks", len(stacks)),
)
return stacks, err
}
func (s *traceMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (pkg *Pkg, err error) {
span, ctx := tracing.StartSpanFromContextWithOperationName(ctx, "CreatePkg")
defer span.Finish()

View File

@ -1,6 +1,7 @@
package pkger
import (
"bytes"
"context"
"encoding/json"
"time"
@ -75,6 +76,93 @@ func (s *StoreKV) CreateStack(ctx context.Context, stack Stack) error {
return s.put(ctx, stack, kv.PutNew())
}
// ListStacks returns a list of stacks.
func (s *StoreKV) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
if len(f.StackIDs) > 0 && len(f.Names) == 0 {
return s.listStacksByID(ctx, orgID, f.StackIDs)
}
filterFn, err := storeListFilterFn(orgID, f)
if err != nil {
return nil, err
}
var stacks []Stack
err = s.kvStore.View(ctx, func(tx kv.Tx) error {
return s.indexBase.Find(ctx, tx, kv.FindOpts{
CaptureFn: func(key []byte, decodedVal interface{}) error {
stack, err := convertStackEntToStack(decodedVal.(*entStack))
if err != nil {
return err
}
stacks = append(stacks, stack)
return nil
},
FilterEntFn: func(key []byte, decodedVal interface{}) bool {
st := decodedVal.(*entStack)
return filterFn(st)
},
})
})
if err != nil {
return nil, err
}
return stacks, nil
}
func storeListFilterFn(orgID influxdb.ID, f ListFilter) (func(*entStack) bool, error) {
orgIDEncoded, err := orgID.Encode()
if err != nil {
return nil, err
}
mIDs := make(map[string]bool)
for _, id := range f.StackIDs {
b, err := id.Encode()
if err != nil {
return nil, err
}
mIDs[string(b)] = true
}
mNames := make(map[string]bool)
for _, name := range f.Names {
mNames[name] = true
}
optionalFieldFilterFn := func(ent *entStack) bool {
id := string(ent.ID)
if len(mIDs) > 0 || len(mNames) > 0 {
return mIDs[id] || mNames[ent.Name]
}
return true
}
return func(st *entStack) bool {
return bytes.Equal(orgIDEncoded, st.OrgID) && optionalFieldFilterFn(st)
}, nil
}
func (s *StoreKV) listStacksByID(ctx context.Context, orgID influxdb.ID, stackIDs []influxdb.ID) ([]Stack, error) {
var stacks []Stack
for _, id := range stackIDs {
st, err := s.ReadStackByID(ctx, id)
if influxdb.ErrorCode(err) == influxdb.ENotFound {
// since the stackIDs are a filter, if it is not found, we just continue
// on. If the user wants to verify the existence of a particular stack
// then it would be upon them to use the ReadByID call.
continue
}
if err != nil {
return nil, err
}
if orgID != st.OrgID {
continue
}
stacks = append(stacks, st)
}
return stacks, nil
}
// ReadStackByID reads a stack by the provided ID.
func (s *StoreKV) ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error) {
var stack Stack

View File

@ -83,6 +83,146 @@ func TestStoreKV(t *testing.T) {
})
})
t.Run("list stacks", func(t *testing.T) {
defer inMemStore.Flush(context.Background())
storeKV := pkger.NewStoreKV(inMemStore)
const orgID1 = 1
const orgID2 = 2
seedEntities(t, storeKV,
pkger.Stack{
ID: 1,
OrgID: orgID1,
Name: "first_name",
},
pkger.Stack{
ID: 2,
OrgID: orgID2,
Name: "first_name",
},
pkger.Stack{
ID: 3,
OrgID: orgID1,
Name: "second_name",
},
pkger.Stack{
ID: 4,
OrgID: orgID2,
Name: "second_name",
},
)
tests := []struct {
name string
orgID influxdb.ID
filter pkger.ListFilter
expected []pkger.Stack
}{
{
name: "by org id",
orgID: orgID1,
expected: []pkger.Stack{
{
ID: 1,
OrgID: orgID1,
Name: "first_name",
},
{
ID: 3,
OrgID: orgID1,
Name: "second_name",
},
},
},
{
name: "by stack ids",
orgID: orgID1,
filter: pkger.ListFilter{
StackIDs: []influxdb.ID{1, 3},
},
expected: []pkger.Stack{
{
ID: 1,
OrgID: orgID1,
Name: "first_name",
},
{
ID: 3,
OrgID: orgID1,
Name: "second_name",
},
},
},
{
name: "by stack ids skips ids that belong to different organization",
orgID: orgID1,
filter: pkger.ListFilter{
StackIDs: []influxdb.ID{1, 2, 4},
},
expected: []pkger.Stack{{
ID: 1,
OrgID: orgID1,
Name: "first_name",
}},
},
{
name: "stack ids that do not exist are skipped",
orgID: orgID1,
filter: pkger.ListFilter{
StackIDs: []influxdb.ID{1, 9000},
},
expected: []pkger.Stack{{
ID: 1,
OrgID: orgID1,
Name: "first_name",
}},
},
{
name: "by name",
orgID: orgID1,
filter: pkger.ListFilter{
Names: []string{"first_name"},
},
expected: []pkger.Stack{{
ID: 1,
OrgID: orgID1,
Name: "first_name",
}},
},
{
name: "by name and id",
orgID: orgID1,
filter: pkger.ListFilter{
StackIDs: []influxdb.ID{3},
Names: []string{"first_name"},
},
expected: []pkger.Stack{
{
ID: 1,
OrgID: orgID1,
Name: "first_name",
},
{
ID: 3,
OrgID: orgID1,
Name: "second_name",
},
},
},
}
for _, tt := range tests {
fn := func(t *testing.T) {
stacks, err := storeKV.ListStacks(context.Background(), tt.orgID, tt.filter)
require.NoError(t, err)
assert.Equal(t, tt.expected, stacks)
}
t.Run(tt.name, fn)
}
})
t.Run("read a stack", func(t *testing.T) {
defer inMemStore.Flush(context.Background())