From 097d761c3a3c77011f8134dc39c7c1c3a15f6dfa Mon Sep 17 00:00:00 2001 From: Johnny Steenbergen Date: Wed, 29 Apr 2020 21:50:52 -0700 Subject: [PATCH] feat(pkger): extend store with stack lists operation references: #17544 --- pkger/service.go | 1 + pkger/service_auth.go | 8 +++ pkger/service_logging.go | 25 ++++++- pkger/service_metrics.go | 6 ++ pkger/service_test.go | 4 ++ pkger/service_tracing.go | 14 ++++ pkger/store.go | 88 ++++++++++++++++++++++++ pkger/store_test.go | 140 +++++++++++++++++++++++++++++++++++++++ 8 files changed, 285 insertions(+), 1 deletion(-) diff --git a/pkger/service.go b/pkger/service.go index 950fbb67e1..2b44631335 100644 --- a/pkger/service.go +++ b/pkger/service.go @@ -199,6 +199,7 @@ func WithVariableSVC(varSVC influxdb.VariableService) ServiceSetterFn { // Store is the storage behavior the Service depends on. type Store interface { CreateStack(ctx context.Context, stack Stack) error + ListStacks(ctx context.Context, orgID influxdb.ID, filter ListFilter) ([]Stack, error) ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error) UpdateStack(ctx context.Context, stack Stack) error DeleteStack(ctx context.Context, id influxdb.ID) error diff --git a/pkger/service_auth.go b/pkger/service_auth.go index e257a6e439..f871672a4a 100644 --- a/pkger/service_auth.go +++ b/pkger/service_auth.go @@ -36,6 +36,14 @@ func (s *authMW) InitStack(ctx context.Context, userID influxdb.ID, newStack Sta return s.next.InitStack(ctx, userID, newStack) } +func (s *authMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) { + err := s.authAgent.OrgPermissions(ctx, orgID, influxdb.ReadAction) + if err != nil { + return nil, err + } + return s.next.ListStacks(ctx, orgID, f) +} + func (s *authMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error) { return s.next.CreatePkg(ctx, setters...) } diff --git a/pkger/service_logging.go b/pkger/service_logging.go index 4270ff1ecc..65cb1edfec 100644 --- a/pkger/service_logging.go +++ b/pkger/service_logging.go @@ -34,15 +34,38 @@ func (s *loggingMW) InitStack(ctx context.Context, userID influxdb.ID, newStack s.logger.Error( "failed to init stack", zap.Error(err), - zap.Duration("took", time.Since(start)), zap.Stringer("orgID", newStack.OrgID), zap.Stringer("userID", userID), zap.Strings("urls", newStack.URLs), + zap.Duration("took", time.Since(start)), ) }(time.Now()) return s.next.InitStack(ctx, userID, newStack) } +func (s *loggingMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) (stacks []Stack, err error) { + defer func(start time.Time) { + if err == nil { + return + } + + var stackIDs []string + for _, id := range f.StackIDs { + stackIDs = append(stackIDs, id.String()) + } + + s.logger.Error( + "failed to list stacks", + zap.Error(err), + zap.Stringer("orgID", orgID), + zap.Strings("stackIDs", stackIDs), + zap.Strings("names", f.Names), + zap.Duration("took", time.Since(start)), + ) + }(time.Now()) + return s.next.ListStacks(ctx, orgID, f) +} + func (s *loggingMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (pkg *Pkg, err error) { defer func(start time.Time) { dur := zap.Duration("took", time.Since(start)) diff --git a/pkger/service_metrics.go b/pkger/service_metrics.go index 49fcd1dccc..e573b6ed59 100644 --- a/pkger/service_metrics.go +++ b/pkger/service_metrics.go @@ -33,6 +33,12 @@ func (s *mwMetrics) InitStack(ctx context.Context, userID influxdb.ID, newStack return stack, rec(err) } +func (s *mwMetrics) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) { + rec := s.rec.Record("list_stacks") + stacks, err := s.next.ListStacks(ctx, orgID, f) + return stacks, rec(err) +} + func (s *mwMetrics) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error) { rec := s.rec.Record("create_pkg") pkg, err := s.next.CreatePkg(ctx, setters...) diff --git a/pkger/service_test.go b/pkger/service_test.go index fe72c0d296..2d98354e1b 100644 --- a/pkger/service_test.go +++ b/pkger/service_test.go @@ -3234,6 +3234,10 @@ func (s *fakeStore) CreateStack(ctx context.Context, stack Stack) error { panic("not implemented") } +func (s *fakeStore) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) { + panic("not implemented") +} + func (s *fakeStore) ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error) { panic("not implemented") } diff --git a/pkger/service_tracing.go b/pkger/service_tracing.go index ad7eaf0223..2c5e3304e7 100644 --- a/pkger/service_tracing.go +++ b/pkger/service_tracing.go @@ -3,6 +3,8 @@ package pkger import ( "context" + "github.com/opentracing/opentracing-go/log" + "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kit/tracing" ) @@ -26,6 +28,18 @@ func (s *traceMW) InitStack(ctx context.Context, userID influxdb.ID, newStack St return s.next.InitStack(ctx, userID, newStack) } +func (s *traceMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) { + span, ctx := tracing.StartSpanFromContextWithOperationName(ctx, "ListStacks") + defer span.Finish() + + stacks, err := s.next.ListStacks(ctx, orgID, f) + span.LogFields( + log.String("org_id", orgID.String()), + log.Int("num_stacks", len(stacks)), + ) + return stacks, err +} + func (s *traceMW) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (pkg *Pkg, err error) { span, ctx := tracing.StartSpanFromContextWithOperationName(ctx, "CreatePkg") defer span.Finish() diff --git a/pkger/store.go b/pkger/store.go index 61e830d348..6b8ff74033 100644 --- a/pkger/store.go +++ b/pkger/store.go @@ -1,6 +1,7 @@ package pkger import ( + "bytes" "context" "encoding/json" "time" @@ -75,6 +76,93 @@ func (s *StoreKV) CreateStack(ctx context.Context, stack Stack) error { return s.put(ctx, stack, kv.PutNew()) } +// ListStacks returns a list of stacks. +func (s *StoreKV) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) { + if len(f.StackIDs) > 0 && len(f.Names) == 0 { + return s.listStacksByID(ctx, orgID, f.StackIDs) + } + + filterFn, err := storeListFilterFn(orgID, f) + if err != nil { + return nil, err + } + + var stacks []Stack + err = s.kvStore.View(ctx, func(tx kv.Tx) error { + return s.indexBase.Find(ctx, tx, kv.FindOpts{ + CaptureFn: func(key []byte, decodedVal interface{}) error { + stack, err := convertStackEntToStack(decodedVal.(*entStack)) + if err != nil { + return err + } + stacks = append(stacks, stack) + return nil + }, + FilterEntFn: func(key []byte, decodedVal interface{}) bool { + st := decodedVal.(*entStack) + return filterFn(st) + }, + }) + }) + if err != nil { + return nil, err + } + return stacks, nil +} + +func storeListFilterFn(orgID influxdb.ID, f ListFilter) (func(*entStack) bool, error) { + orgIDEncoded, err := orgID.Encode() + if err != nil { + return nil, err + } + + mIDs := make(map[string]bool) + for _, id := range f.StackIDs { + b, err := id.Encode() + if err != nil { + return nil, err + } + mIDs[string(b)] = true + } + + mNames := make(map[string]bool) + for _, name := range f.Names { + mNames[name] = true + } + + optionalFieldFilterFn := func(ent *entStack) bool { + id := string(ent.ID) + if len(mIDs) > 0 || len(mNames) > 0 { + return mIDs[id] || mNames[ent.Name] + } + return true + } + return func(st *entStack) bool { + return bytes.Equal(orgIDEncoded, st.OrgID) && optionalFieldFilterFn(st) + }, nil +} + +func (s *StoreKV) listStacksByID(ctx context.Context, orgID influxdb.ID, stackIDs []influxdb.ID) ([]Stack, error) { + var stacks []Stack + for _, id := range stackIDs { + st, err := s.ReadStackByID(ctx, id) + if influxdb.ErrorCode(err) == influxdb.ENotFound { + // since the stackIDs are a filter, if it is not found, we just continue + // on. If the user wants to verify the existence of a particular stack + // then it would be upon them to use the ReadByID call. + continue + } + if err != nil { + return nil, err + } + if orgID != st.OrgID { + continue + } + stacks = append(stacks, st) + } + return stacks, nil +} + // ReadStackByID reads a stack by the provided ID. func (s *StoreKV) ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error) { var stack Stack diff --git a/pkger/store_test.go b/pkger/store_test.go index 1120234920..6c30d47b0b 100644 --- a/pkger/store_test.go +++ b/pkger/store_test.go @@ -83,6 +83,146 @@ func TestStoreKV(t *testing.T) { }) }) + t.Run("list stacks", func(t *testing.T) { + defer inMemStore.Flush(context.Background()) + + storeKV := pkger.NewStoreKV(inMemStore) + + const orgID1 = 1 + const orgID2 = 2 + seedEntities(t, storeKV, + pkger.Stack{ + ID: 1, + OrgID: orgID1, + Name: "first_name", + }, + pkger.Stack{ + ID: 2, + OrgID: orgID2, + Name: "first_name", + }, + pkger.Stack{ + ID: 3, + OrgID: orgID1, + Name: "second_name", + }, + pkger.Stack{ + ID: 4, + OrgID: orgID2, + Name: "second_name", + }, + ) + + tests := []struct { + name string + orgID influxdb.ID + filter pkger.ListFilter + expected []pkger.Stack + }{ + { + name: "by org id", + orgID: orgID1, + expected: []pkger.Stack{ + { + ID: 1, + OrgID: orgID1, + Name: "first_name", + }, + { + ID: 3, + OrgID: orgID1, + Name: "second_name", + }, + }, + }, + { + name: "by stack ids", + orgID: orgID1, + filter: pkger.ListFilter{ + StackIDs: []influxdb.ID{1, 3}, + }, + expected: []pkger.Stack{ + { + ID: 1, + OrgID: orgID1, + Name: "first_name", + }, + { + ID: 3, + OrgID: orgID1, + Name: "second_name", + }, + }, + }, + { + name: "by stack ids skips ids that belong to different organization", + orgID: orgID1, + filter: pkger.ListFilter{ + StackIDs: []influxdb.ID{1, 2, 4}, + }, + expected: []pkger.Stack{{ + ID: 1, + OrgID: orgID1, + Name: "first_name", + }}, + }, + { + name: "stack ids that do not exist are skipped", + orgID: orgID1, + filter: pkger.ListFilter{ + StackIDs: []influxdb.ID{1, 9000}, + }, + expected: []pkger.Stack{{ + ID: 1, + OrgID: orgID1, + Name: "first_name", + }}, + }, + { + name: "by name", + orgID: orgID1, + filter: pkger.ListFilter{ + Names: []string{"first_name"}, + }, + expected: []pkger.Stack{{ + ID: 1, + OrgID: orgID1, + Name: "first_name", + }}, + }, + { + name: "by name and id", + orgID: orgID1, + filter: pkger.ListFilter{ + StackIDs: []influxdb.ID{3}, + Names: []string{"first_name"}, + }, + expected: []pkger.Stack{ + { + ID: 1, + OrgID: orgID1, + Name: "first_name", + }, + { + ID: 3, + OrgID: orgID1, + Name: "second_name", + }, + }, + }, + } + + for _, tt := range tests { + fn := func(t *testing.T) { + stacks, err := storeKV.ListStacks(context.Background(), tt.orgID, tt.filter) + require.NoError(t, err) + assert.Equal(t, tt.expected, stacks) + } + + t.Run(tt.name, fn) + } + }) + t.Run("read a stack", func(t *testing.T) { defer inMemStore.Flush(context.Background())