From b274e15eacb2cf6c8915a9f75caf63d69fb2aac1 Mon Sep 17 00:00:00 2001 From: George MacRorie Date: Fri, 30 Oct 2020 12:06:24 +0000 Subject: [PATCH] fix(telegraf): support pagination parameters when listing --- cmd/influx/telegraf.go | 8 ++-- dbrp/service.go | 48 +++++++++++----------- http/telegraf.go | 15 ++----- kv/index.go | 6 +-- kv/migration/migration.go | 12 +++--- kv/store.go | 4 +- kv/urm.go | 6 +-- paging.go | 14 +++++++ secret/storage.go | 9 +++-- telegraf.go | 1 - telegraf/service/telegraf.go | 37 +++++++++++++---- telegraf/service/testing/testing.go | 62 ++++++++++++++++++++++++++++- tenant/storage_urm.go | 21 +++------- testing/index.go | 10 ++--- 14 files changed, 166 insertions(+), 87 deletions(-) diff --git a/cmd/influx/telegraf.go b/cmd/influx/telegraf.go index 2144f44fff..45bd5aaa4a 100644 --- a/cmd/influx/telegraf.go +++ b/cmd/influx/telegraf.go @@ -96,10 +96,10 @@ func (b *cmdTelegrafBuilder) listRunE(cmd *cobra.Command, args []string) error { return b.writeTelegrafConfig(cfg) } - cfgs, _, err := svc.FindTelegrafConfigs(context.Background(), influxdb.TelegrafConfigFilter{ - OrgID: &orgID, - UserResourceMappingFilter: influxdb.UserResourceMappingFilter{ResourceType: influxdb.TelegrafsResourceType}, - }) + cfgs, _, err := svc.FindTelegrafConfigs(context.Background(), + influxdb.TelegrafConfigFilter{ + OrgID: &orgID, + }) if err != nil { return err } diff --git a/dbrp/service.go b/dbrp/service.go index d3dc911557..86ecf5c3d3 100644 --- a/dbrp/service.go +++ b/dbrp/service.go @@ -28,7 +28,6 @@ import ( "bytes" "context" "encoding/json" - "fmt" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kv" @@ -160,38 +159,39 @@ func (s *Service) unsetDefault(tx kv.Tx, compKey []byte) error { // getFirstBut returns the first element in the db/rp index (not accounting for the `skipID`). // If the length of the returned ID is 0, it means no element was found. // The skip value is useful, for instance, if one wants to delete an element based on the result of this operation. -func (s *Service) getFirstBut(tx kv.Tx, compKey []byte, skipID []byte) ([]byte, error) { - stop := fmt.Errorf("stop") - var next []byte - if err := s.byOrgAndDatabase.Walk(context.Background(), tx, compKey, func(k, v []byte) error { +func (s *Service) getFirstBut(tx kv.Tx, compKey []byte, skipID []byte) (next []byte, err error) { + err = s.byOrgAndDatabase.Walk(context.Background(), tx, compKey, func(k, v []byte) (bool, error) { if bytes.Equal(skipID, k) { - return nil + return true, nil } + next = k - return stop - }); err != nil && err != stop { - return nil, ErrInternalService(err) - } - return next, nil + + return false, nil + }) + return } // isDBRPUnique verifies if the triple orgID-database-retention-policy is unique. func (s *Service) isDBRPUnique(ctx context.Context, m influxdb.DBRPMappingV2) error { return s.store.View(ctx, func(tx kv.Tx) error { - return s.byOrgAndDatabase.Walk(ctx, tx, composeForeignKey(m.OrganizationID, m.Database), func(k, v []byte) error { + return s.byOrgAndDatabase.Walk(ctx, tx, composeForeignKey(m.OrganizationID, m.Database), func(k, v []byte) (bool, error) { dbrp := &influxdb.DBRPMappingV2{} if err := json.Unmarshal(v, dbrp); err != nil { - return ErrInternalService(err) + return false, ErrInternalService(err) } + if dbrp.ID == m.ID { // Corner case. // This is the very same DBRP, just skip it! - return nil + return true, nil } + if dbrp.RetentionPolicy == m.RetentionPolicy { - return ErrDBRPAlreadyExists("another DBRP mapping with same orgID, db, and rp exists") + return false, ErrDBRPAlreadyExists("another DBRP mapping with same orgID, db, and rp exists") } - return nil + + return true, nil }) }) } @@ -254,22 +254,23 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte } ms := []*influxdb.DBRPMappingV2{} - add := func(tx kv.Tx) func(k, v []byte) error { - return func(k, v []byte) error { + add := func(tx kv.Tx) func(k, v []byte) (bool, error) { + return func(k, v []byte) (bool, error) { m := influxdb.DBRPMappingV2{} if err := json.Unmarshal(v, &m); err != nil { - return ErrInternalService(err) + return false, ErrInternalService(err) } // Updating the Default field must be done before filtering. defID, err := get(tx, m.OrganizationID, m.Database) if err != nil { - return ErrInternalService(err) + return false, ErrInternalService(err) } + m.Default = m.ID == *defID if filterFunc(&m, filter) { ms = append(ms, &m) } - return nil + return true, nil } } @@ -303,7 +304,8 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte if err != nil { return ErrInternalService(err) } - return add(tx)(defID, v) + _, err = add(tx)(defID, v) + return err } } return s.byOrgAndDatabase.Walk(ctx, tx, compKey, add(tx)) @@ -318,7 +320,7 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte } for k, v := cur.First(); k != nil; k, v = cur.Next() { - if err := add(tx)(k, v); err != nil { + if _, err := add(tx)(k, v); err != nil { return err } } diff --git a/http/telegraf.go b/http/telegraf.go index c06dd0609f..43c8b483a5 100644 --- a/http/telegraf.go +++ b/http/telegraf.go @@ -278,12 +278,8 @@ func (h *TelegrafHandler) handleGetTelegraf(w http.ResponseWriter, r *http.Reque func decodeTelegrafConfigFilter(ctx context.Context, r *http.Request) (*influxdb.TelegrafConfigFilter, error) { f := &influxdb.TelegrafConfigFilter{} - urm, err := decodeUserResourceMappingFilter(ctx, r, influxdb.TelegrafsResourceType) - if err == nil { - f.UserResourceMappingFilter = *urm - } - q := r.URL.Query() + if orgIDStr := q.Get("orgID"); orgIDStr != "" { orgID, err := influxdb.IDFromString(orgIDStr) if err != nil { @@ -297,7 +293,8 @@ func decodeTelegrafConfigFilter(ctx context.Context, r *http.Request) (*influxdb } else if orgNameStr := q.Get("org"); orgNameStr != "" { f.Organization = &orgNameStr } - return f, err + + return f, nil } // handlePostTelegraf is the HTTP handler for the POST /api/v2/telegrafs route. @@ -445,12 +442,6 @@ func (s *TelegrafService) FindTelegrafConfigs(ctx context.Context, f influxdb.Te if f.Organization != nil { params = append(params, [2]string{"organization", *f.Organization}) } - if f.ResourceID != 0 { - params = append(params, [2]string{"resourceID", f.ResourceID.String()}) - } - if f.UserID != 0 { - params = append(params, [2]string{"userID", f.UserID.String()}) - } var resp struct { Configs []*influxdb.TelegrafConfig `json:"configurations"` diff --git a/kv/index.go b/kv/index.go index 262de2e681..4ace27272a 100644 --- a/kv/index.go +++ b/kv/index.go @@ -222,7 +222,7 @@ func indexWalk(ctx context.Context, indexCursor ForwardCursor, sourceBucket Buck for i, value := range values { if value != nil { - if err := visit(keys[i], value); err != nil { + if cont, err := visit(keys[i], value); !cont || err != nil { return err } } @@ -390,9 +390,9 @@ func consumeBucket(ctx context.Context, store Store, fn func(tx Tx) (Bucket, err return err } - return WalkCursor(ctx, cursor, func(k, v []byte) error { + return WalkCursor(ctx, cursor, func(k, v []byte) (bool, error) { kvs = append(kvs, [2][]byte{k, v}) - return nil + return true, nil }) }) } diff --git a/kv/migration/migration.go b/kv/migration/migration.go index 8f2591b32b..a911951ec8 100644 --- a/kv/migration/migration.go +++ b/kv/migration/migration.go @@ -253,24 +253,24 @@ func (m *Migrator) walk(ctx context.Context, store kv.Store, fn func(id influxdb return err } - return kv.WalkCursor(ctx, cursor, func(k, v []byte) error { + return kv.WalkCursor(ctx, cursor, func(k, v []byte) (bool, error) { var id influxdb.ID if err := id.Decode(k); err != nil { - return fmt.Errorf("decoding migration id: %w", err) + return false, fmt.Errorf("decoding migration id: %w", err) } var migration Migration if err := json.Unmarshal(v, &migration); err != nil { - return err + return false, err } idx := int(id) - 1 if idx >= len(m.Specs) { - return fmt.Errorf("migration %q: %w", migration.Name, ErrMigrationSpecNotFound) + return false, fmt.Errorf("migration %q: %w", migration.Name, ErrMigrationSpecNotFound) } if spec := m.Specs[idx]; spec.MigrationName() != migration.Name { - return fmt.Errorf("expected migration %q, found %q", spec.MigrationName(), migration.Name) + return false, fmt.Errorf("expected migration %q, found %q", spec.MigrationName(), migration.Name) } if migration.FinishedAt != nil { @@ -279,7 +279,7 @@ func (m *Migrator) walk(ctx context.Context, store kv.Store, fn func(id influxdb fn(id, migration) - return nil + return true, nil }) }); err != nil { return fmt.Errorf("reading migrations: %w", err) diff --git a/kv/store.go b/kv/store.go index 32792e2bd4..b31e21542b 100644 --- a/kv/store.go +++ b/kv/store.go @@ -234,7 +234,7 @@ func WithCursorLimit(limit int) CursorOption { // VisitFunc is called for each k, v byte slice pair from the underlying source bucket // which are found in the index bucket for a provided foreign key. -type VisitFunc func(k, v []byte) error +type VisitFunc func(k, v []byte) (bool, error) // WalkCursor consumers the forward cursor call visit for each k/v pair found func WalkCursor(ctx context.Context, cursor ForwardCursor, visit VisitFunc) (err error) { @@ -245,7 +245,7 @@ func WalkCursor(ctx context.Context, cursor ForwardCursor, visit VisitFunc) (err }() for k, v := cursor.Next(); k != nil; k, v = cursor.Next() { - if err := visit(k, v); err != nil { + if cont, err := visit(k, v); !cont || err != nil { return err } diff --git a/kv/urm.go b/kv/urm.go index ca1fde924f..d8e81f8aa4 100644 --- a/kv/urm.go +++ b/kv/urm.go @@ -136,17 +136,17 @@ func (s *Service) findUserResourceMappings(ctx context.Context, tx Tx, filter in if filter.UserID.Valid() { // urm by user index lookup userID, _ := filter.UserID.Encode() - if err := s.urmByUserIndex.Walk(ctx, tx, userID, func(k, v []byte) error { + if err := s.urmByUserIndex.Walk(ctx, tx, userID, func(k, v []byte) (bool, error) { m := &influxdb.UserResourceMapping{} if err := json.Unmarshal(v, m); err != nil { - return CorruptURMError(err) + return false, CorruptURMError(err) } if filterFn(m) { ms = append(ms, m) } - return nil + return true, nil }); err != nil { return nil, err } diff --git a/paging.go b/paging.go index 7166713291..45e533f27f 100644 --- a/paging.go +++ b/paging.go @@ -34,6 +34,20 @@ type FindOptions struct { Descending bool } +// GetLimit returns the resolved limit between then limit boundaries. +// Given a limit <= 0 it returns the default limit. +func (f *FindOptions) GetLimit() int { + if f == nil || f.Limit <= 0 { + return DefaultPageSize + } + + if f.Limit > MaxPageSize { + return MaxPageSize + } + + return f.Limit +} + // DecodeFindOptions returns a FindOptions decoded from http request. func DecodeFindOptions(r *http.Request) (*FindOptions, error) { opts := &FindOptions{} diff --git a/secret/storage.go b/secret/storage.go index fe8cdfb570..bf67ae2ab4 100644 --- a/secret/storage.go +++ b/secret/storage.go @@ -81,19 +81,20 @@ func (s *Storage) ListSecret(ctx context.Context, tx kv.Tx, orgID influxdb.ID) ( keys := []string{} - err = kv.WalkCursor(ctx, cur, func(k, v []byte) error { + err = kv.WalkCursor(ctx, cur, func(k, v []byte) (bool, error) { id, key, err := decodeSecretKey(k) if err != nil { - return err + return false, err } if id != orgID { // We've reached the end of the keyspace for the provided orgID - return nil + return false, nil } keys = append(keys, key) - return nil + + return true, nil }) if err != nil { return nil, err diff --git a/telegraf.go b/telegraf.go index a09213b1dd..f93d9e722c 100644 --- a/telegraf.go +++ b/telegraf.go @@ -54,7 +54,6 @@ type TelegrafConfigStore interface { type TelegrafConfigFilter struct { OrgID *ID Organization *string - UserResourceMappingFilter } // TelegrafConfig stores telegraf config for one telegraf instance. diff --git a/telegraf/service/telegraf.go b/telegraf/service/telegraf.go index 040d56e395..6598908e96 100644 --- a/telegraf/service/telegraf.go +++ b/telegraf/service/telegraf.go @@ -155,25 +155,40 @@ func (s *Service) findTelegrafConfigByID(ctx context.Context, tx kv.Tx, id influ // Additional options provide pagination & sorting. func (s *Service) FindTelegrafConfigs(ctx context.Context, filter influxdb.TelegrafConfigFilter, opt ...influxdb.FindOptions) (tcs []*influxdb.TelegrafConfig, n int, err error) { err = s.kv.View(ctx, func(tx kv.Tx) error { - tcs, n, err = s.findTelegrafConfigs(ctx, tx, filter) + tcs, n, err = s.findTelegrafConfigs(ctx, tx, filter, opt...) return err }) return tcs, n, err } func (s *Service) findTelegrafConfigs(ctx context.Context, tx kv.Tx, filter influxdb.TelegrafConfigFilter, opt ...influxdb.FindOptions) ([]*influxdb.TelegrafConfig, int, error) { - tcs := make([]*influxdb.TelegrafConfig, 0) + var ( + limit = influxdb.DefaultPageSize + offset int + count int + tcs = make([]*influxdb.TelegrafConfig, 0) + ) - visit := func(k, v []byte) error { + if len(opt) > 0 { + limit = opt[0].GetLimit() + offset = opt[0].Offset + } + + visit := func(k, v []byte) (bool, error) { var tc influxdb.TelegrafConfig if err := json.Unmarshal(v, &tc); err != nil { - return err + return false, err } - tcs = append(tcs, &tc) + // skip until offset reached + if count >= offset { + tcs = append(tcs, &tc) + } - return nil + count++ + // stop cursing when limit is reached + return len(tcs) < limit, nil } if filter.OrgID == nil { @@ -183,8 +198,14 @@ func (s *Service) findTelegrafConfigs(ctx context.Context, tx kv.Tx, filter infl return nil, 0, err } - // TODO(georgemac): convert find options into cursor options - cursor, err := bucket.ForwardCursor(nil) + // cursors do not support numeric offset + // but we can at least constrain the response + // size by the offset + limit since we are + // not doing any other filtering + // REMOVE this cursor option if you do any + // other filtering + + cursor, err := bucket.ForwardCursor(nil, kv.WithCursorLimit(offset+limit)) if err != nil { return nil, 0, err } diff --git a/telegraf/service/testing/testing.go b/telegraf/service/testing/testing.go index 545002fdec..82db4c8ebe 100644 --- a/telegraf/service/testing/testing.go +++ b/telegraf/service/testing/testing.go @@ -364,6 +364,7 @@ func FindTelegrafConfigs( ) { type args struct { filter influxdb.TelegrafConfigFilter + opts []influxdb.FindOptions } type wants struct { @@ -518,6 +519,65 @@ func FindTelegrafConfigs( telegrafConfigs: []*influxdb.TelegrafConfig{}, }, }, + { + name: "find with limit and offset", + fields: TelegrafConfigFields{ + IDGenerator: mock.NewIncrementingIDGenerator(oneID), + TelegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: oneID, + OrgID: fourID, + Name: "tc1", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: twoID, + OrgID: fourID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: threeID, + OrgID: oneID, + Name: "tc3", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: fourID, + OrgID: oneID, + Name: "tc4", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + args: args{ + opts: []influxdb.FindOptions{ + {Limit: 2, Offset: 1}, + }, + }, + wants: wants{ + telegrafConfigs: []*influxdb.TelegrafConfig{ + { + ID: twoID, + OrgID: fourID, + Name: "tc2", + Config: "[[inputs.file]]\n[[inputs.mem]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + { + ID: threeID, + OrgID: oneID, + Name: "tc3", + Config: "[[inputs.cpu]]\n", + Metadata: map[string]interface{}{"buckets": []interface{}{}}, + }, + }, + }, + }, } for _, tt := range tests { @@ -526,7 +586,7 @@ func FindTelegrafConfigs( defer done() ctx := context.Background() - tcs, _, err := s.FindTelegrafConfigs(ctx, tt.args.filter) + tcs, _, err := s.FindTelegrafConfigs(ctx, tt.args.filter, tt.args.opts...) if err != nil && tt.wants.err == nil { t.Fatalf("expected errors to be nil got '%v'", err) } diff --git a/tenant/storage_urm.go b/tenant/storage_urm.go index 8146eee8bc..2e896e863a 100644 --- a/tenant/storage_urm.go +++ b/tenant/storage_urm.go @@ -3,7 +3,6 @@ package tenant import ( "context" "encoding/json" - "errors" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kv" @@ -72,37 +71,29 @@ func (s *Store) ListURMs(ctx context.Context, tx kv.Tx, filter influxdb.UserReso if filter.UserID.Valid() { var ( - errPageLimit = errors.New("page limit reached") // urm by user index lookup userID, _ = filter.UserID.Encode() seen int ) - if err := s.urmByUserIndex.Walk(ctx, tx, userID, func(k, v []byte) error { + err := s.urmByUserIndex.Walk(ctx, tx, userID, func(k, v []byte) (bool, error) { m := &influxdb.UserResourceMapping{} if err := json.Unmarshal(v, m); err != nil { - return CorruptURMError(err) + return false, CorruptURMError(err) } // respect offset parameter reachedOffset := (len(opt) == 0 || seen >= opt[0].Offset) - if filterFn(m) && reachedOffset { + if reachedOffset && filterFn(m) { ms = append(ms, m) } - // respect pagination in URMs - if len(opt) > 0 && opt[0].Limit > 0 && len(ms) >= opt[0].Limit { - return errPageLimit - } - seen++ - return nil - }); err != nil && err != errPageLimit { - return nil, err - } + return (len(opt) == 0 || opt[0].Limit <= 0 || len(ms) < opt[0].Limit), nil + }) - return ms, nil + return ms, err } // for now the best we can do is use the resourceID if we have that as a forward cursor option diff --git a/testing/index.go b/testing/index.go index 0ac097b7a7..69b8d348a7 100644 --- a/testing/index.go +++ b/testing/index.go @@ -61,14 +61,14 @@ func newSomeResourceStore(t tester, ctx context.Context, store kv.SchemaStore) * func (s *someResourceStore) FindByOwner(ctx context.Context, ownerID string) (resources []someResource, err error) { err = s.store.View(ctx, func(tx kv.Tx) error { - return s.ownerIDIndex.Walk(ctx, tx, []byte(ownerID), func(k, v []byte) error { + return s.ownerIDIndex.Walk(ctx, tx, []byte(ownerID), func(k, v []byte) (bool, error) { var resource someResource if err := json.Unmarshal(v, &resource); err != nil { - return err + return false, err } resources = append(resources, resource) - return nil + return true, nil }) }) return @@ -410,12 +410,12 @@ func BenchmarkIndexWalk(b *testing.B, store kv.SchemaStore, resourceCount, fetch for i := 0; i < b.N; i++ { store.View(ctx, func(tx kv.Tx) error { - return resourceStore.ownerIDIndex.Walk(ctx, tx, []byte(fmt.Sprintf("owner %d", i%userCount)), func(k, v []byte) error { + return resourceStore.ownerIDIndex.Walk(ctx, tx, []byte(fmt.Sprintf("owner %d", i%userCount)), func(k, v []byte) (bool, error) { if k == nil || v == nil { b.Fatal("entries must not be nil") } - return nil + return true, nil }) }) }