From 825b4f8da73099fda42d8323374ad6b910c69fbb Mon Sep 17 00:00:00 2001 From: Johnny Steenbergen Date: Thu, 9 Jan 2020 09:22:41 -0800 Subject: [PATCH] fix(kv): fix issue with cursor not closing inside TX also drops the prefix search optimizations from index store as well. --- kv/check.go | 13 ++----- kv/store_base.go | 65 ++++++------------------------- kv/store_base_test.go | 13 +++++-- kv/store_index.go | 88 ++---------------------------------------- kv/store_index_test.go | 31 --------------- 5 files changed, 29 insertions(+), 181 deletions(-) diff --git a/kv/check.go b/kv/check.go index 6c70db7a08..8a8d133932 100644 --- a/kv/check.go +++ b/kv/check.go @@ -154,6 +154,9 @@ func filterChecksFn(idMap map[influxdb.ID]bool, filter influxdb.CheckFilter) fun if filter.ID != nil && c.GetID() != *filter.ID { return false } + if filter.OrgID != nil && c.GetOrgID() != *filter.OrgID { + return false + } if filter.Name != nil && c.GetName() != *filter.Name { return false } @@ -198,15 +201,6 @@ func (s *Service) FindChecks(ctx context.Context, filter influxdb.CheckFilter, o filter.OrgID = &o.ID } - var prefix []byte - if filter.OrgID != nil { - encs := []EncodeFn{EncID(*filter.OrgID)} - if filter.Name != nil { - encs = append(encs, EncString(*filter.Name)) - } - prefix, _ = s.checkStore.IndexStore.EntKey(ctx, Entity{UniqueKey: Encode(encs...)}) - } - var opt influxdb.FindOptions if len(opts) > 0 { opt = opts[0] @@ -217,7 +211,6 @@ func (s *Service) FindChecks(ctx context.Context, filter influxdb.CheckFilter, o Descending: opt.Descending, Offset: opt.Offset, Limit: opt.Limit, - Prefix: prefix, FilterEntFn: func(k []byte, v interface{}) bool { ch, ok := v.(influxdb.Check) if err := IsErrUnexpectedDecodeVal(ok); err != nil { diff --git a/kv/store_base.go b/kv/store_base.go index 7506abf84d..22eacd34d0 100644 --- a/kv/store_base.go +++ b/kv/store_base.go @@ -1,7 +1,6 @@ package kv import ( - "bytes" "context" "encoding/json" "errors" @@ -505,52 +504,37 @@ type iterator struct { offset int prefix []byte - seekChan <-chan struct{ k, v []byte } + nextFn func() (key, val []byte) decodeFn func(key, val []byte) (k []byte, decodedVal interface{}, err error) filterFn FilterFn } func (i *iterator) Next(ctx context.Context) (key []byte, val interface{}, err error) { - span, ctx := tracing.StartSpanFromContext(ctx) + span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() if i.limit > 0 && i.counter >= i.limit+i.offset { return nil, nil, nil } - var ( - k, vRaw []byte - nextFn func() ([]byte, []byte) - ) + var k, vRaw []byte switch { + case i.nextFn != nil: + k, vRaw = i.nextFn() case len(i.prefix) > 0: - i.seek(ctx) - nextFn = func() ([]byte, []byte) { - select { - case <-ctx.Done(): - return nil, nil - case kv := <-i.seekChan: - return kv.k, kv.v - } - } - k, vRaw = nextFn() - case i.counter == 0 && i.descending: - k, vRaw = i.cursor.Last() - nextFn = i.cursor.Prev - case i.counter == 0: - k, vRaw = i.cursor.First() - nextFn = i.cursor.Next + k, vRaw = i.cursor.Seek(i.prefix) + i.nextFn = i.cursor.Next case i.descending: - k, vRaw = i.cursor.Prev() - nextFn = i.cursor.Prev + k, vRaw = i.cursor.Last() + i.nextFn = i.cursor.Prev default: - k, vRaw = i.cursor.Next() - nextFn = i.cursor.Next + k, vRaw = i.cursor.First() + i.nextFn = i.cursor.Next } k, decodedVal, err := i.decodeFn(k, vRaw) - for ; ; k, decodedVal, err = i.decodeFn(nextFn()) { + for ; ; k, decodedVal, err = i.decodeFn(i.nextFn()) { if err != nil { return nil, nil, err } @@ -585,31 +569,6 @@ func (i *iterator) isNext(k []byte, v interface{}) bool { return true } -func (i *iterator) seek(ctx context.Context) { - if i.seekChan != nil || len(i.prefix) == 0 { - return - } - out := make(chan struct{ k, v []byte }) - i.seekChan = out - - go func() { - defer close(out) - - for k, v := i.cursor.Seek(i.prefix); bytes.HasPrefix(k, i.prefix); k, v = i.cursor.Next() { - if len(k) == 0 { - return - } - - select { - case <-ctx.Done(): - return - case out <- struct{ k, v []byte }{k: k, v: v}: - } - } - }() - -} - func IsErrUnexpectedDecodeVal(ok bool) error { if ok { return nil diff --git a/kv/store_base_test.go b/kv/store_base_test.go index eceb277b85..94c3cb3c96 100644 --- a/kv/store_base_test.go +++ b/kv/store_base_test.go @@ -284,9 +284,9 @@ func testFind(t *testing.T, fn func(t *testing.T, suffix string) (storeBase, fun expectedEnts := []kv.Entity{ newFooEnt(1, 9000, "foo_0"), - newFooEnt(2, 9000, "foo_1"), - newFooEnt(3, 9003, "foo_2"), - newFooEnt(4, 9004, "foo_3"), + newFooEnt(2000, 9000, "foo_1"), + newFooEnt(3000000, 9003, "foo_2"), + newFooEnt(4000000000, 9004, "foo_3"), } tests := []struct { @@ -330,6 +330,13 @@ func testFind(t *testing.T, fn func(t *testing.T, suffix string) (storeBase, fun }, expected: toIfaces(expectedEnts[2]), }, + { + name: "with id prefix", + opts: kv.FindOpts{ + Prefix: encodeID(t, 3000000)[:influxdb.IDLength-5], + }, + expected: toIfaces(expectedEnts[2], expectedEnts[3]), + }, } for _, tt := range tests { diff --git a/kv/store_index.go b/kv/store_index.go index 30d8c10213..f2dbbd94e5 100644 --- a/kv/store_index.go +++ b/kv/store_index.go @@ -76,94 +76,14 @@ func (s *IndexStore) DeleteEnt(ctx context.Context, tx Tx, ent Entity) error { } // Find provides a mechanism for looking through the bucket via -// the set options. When a prefix is provided, the prefix is used with -// the index, and not the entity bucket. If you wish to look at the entity -// bucket and seek, then nest into the EntStore here and do that instead. +// the set options. When a prefix is provided, it will be used within +// the entity store. If you would like to search the index store, then +// you can by calling the index store directly. func (s *IndexStore) Find(ctx context.Context, tx Tx, opts FindOpts) error { span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() - if len(opts.Prefix) == 0 { - return s.EntStore.Find(ctx, tx, opts) - } - - entCaptureFn := opts.CaptureFn - kvStream, filterFn := s.indexFilterStream(ctx, tx, opts.FilterEntFn) - opts.FilterEntFn = filterFn - opts.CaptureFn = func(key []byte, indexVal interface{}) error { - select { - case <-ctx.Done(): - return ctx.Err() - case fromFilter, ok := <-kvStream: - if !ok { - return nil - } - if fromFilter.err != nil { - return fromFilter.err - } - return entCaptureFn(fromFilter.k, fromFilter.v) - } - } - return s.IndexStore.Find(ctx, tx, opts) - -} - -func (s *IndexStore) indexFilterStream(ctx context.Context, tx Tx, entFilterFn FilterFn) (<-chan struct { - k []byte - v interface{} - err error -}, func([]byte, interface{}) bool) { - - kvStream := make(chan struct { - k []byte - v interface{} - err error - }, 1) - - type kve struct { - k []byte - v interface{} - err error - } - - send := func(key []byte, v interface{}, err error) bool { - select { - case <-ctx.Done(): - case kvStream <- kve{k: key, v: v, err: err}: - } - return true - } - - return kvStream, func(key []byte, indexVal interface{}) (isValid bool) { - defer func() { - if !isValid { - close(kvStream) - } - }() - ent, err := s.IndexStore.ConvertValToEntFn(key, indexVal) - if err != nil { - return send(nil, nil, err) - } - - entVal, err := s.EntStore.FindEnt(ctx, tx, ent) - if err != nil { - return send(nil, nil, err) - } - - entKey, err := s.EntStore.EntKey(ctx, ent) - if err != nil { - return send(nil, nil, err) - } - - if entFilterFn == nil { - return send(entKey, entVal, nil) - } - - if matches := entFilterFn(entKey, entVal); matches { - return send(entKey, entVal, nil) - } - return false - } + return s.EntStore.Find(ctx, tx, opts) } // FindEnt returns the decoded entity body via teh provided entity. diff --git a/kv/store_index_test.go b/kv/store_index_test.go index 7569649136..09358f3348 100644 --- a/kv/store_index_test.go +++ b/kv/store_index_test.go @@ -258,36 +258,5 @@ func TestIndexStore(t *testing.T) { } assert.Equal(t, expected, actuals) }) - - t.Run("lookup via orgID", func(t *testing.T) { - base, done, kvStore := newFooIndexStore(t, "find_index_search") - defer done() - - expectedEnts := []kv.Entity{ - newFooEnt(1, 9000, "foo_0"), - newFooEnt(2, 9001, "foo_1"), - newFooEnt(3, 9003, "foo_2"), - newFooEnt(4, 9003, "foo_3"), - } - - seedEnts(t, kvStore, base, expectedEnts...) - - var actuals []interface{} - view(t, kvStore, func(tx kv.Tx) error { - return base.Find(context.TODO(), tx, kv.FindOpts{ - Prefix: encodeID(t, 9003), - CaptureFn: func(key []byte, decodedVal interface{}) error { - actuals = append(actuals, decodedVal) - return nil - }, - }) - }) - - expected := []interface{}{ - expectedEnts[2].Body, - expectedEnts[3].Body, - } - assert.Equal(t, expected, actuals) - }) }) }