fix(kv): fix issue with cursor not closing inside TX

also drops the prefix search optimizations from index store as well.
pull/16477/head
Johnny Steenbergen 2020-01-09 09:22:41 -08:00 committed by Johnny Steenbergen
parent 98b4be7d08
commit 825b4f8da7
5 changed files with 29 additions and 181 deletions

View File

@ -154,6 +154,9 @@ func filterChecksFn(idMap map[influxdb.ID]bool, filter influxdb.CheckFilter) fun
if filter.ID != nil && c.GetID() != *filter.ID {
return false
}
if filter.OrgID != nil && c.GetOrgID() != *filter.OrgID {
return false
}
if filter.Name != nil && c.GetName() != *filter.Name {
return false
}
@ -198,15 +201,6 @@ func (s *Service) FindChecks(ctx context.Context, filter influxdb.CheckFilter, o
filter.OrgID = &o.ID
}
var prefix []byte
if filter.OrgID != nil {
encs := []EncodeFn{EncID(*filter.OrgID)}
if filter.Name != nil {
encs = append(encs, EncString(*filter.Name))
}
prefix, _ = s.checkStore.IndexStore.EntKey(ctx, Entity{UniqueKey: Encode(encs...)})
}
var opt influxdb.FindOptions
if len(opts) > 0 {
opt = opts[0]
@ -217,7 +211,6 @@ func (s *Service) FindChecks(ctx context.Context, filter influxdb.CheckFilter, o
Descending: opt.Descending,
Offset: opt.Offset,
Limit: opt.Limit,
Prefix: prefix,
FilterEntFn: func(k []byte, v interface{}) bool {
ch, ok := v.(influxdb.Check)
if err := IsErrUnexpectedDecodeVal(ok); err != nil {

View File

@ -1,7 +1,6 @@
package kv
import (
"bytes"
"context"
"encoding/json"
"errors"
@ -505,52 +504,37 @@ type iterator struct {
offset int
prefix []byte
seekChan <-chan struct{ k, v []byte }
nextFn func() (key, val []byte)
decodeFn func(key, val []byte) (k []byte, decodedVal interface{}, err error)
filterFn FilterFn
}
func (i *iterator) Next(ctx context.Context) (key []byte, val interface{}, err error) {
span, ctx := tracing.StartSpanFromContext(ctx)
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if i.limit > 0 && i.counter >= i.limit+i.offset {
return nil, nil, nil
}
var (
k, vRaw []byte
nextFn func() ([]byte, []byte)
)
var k, vRaw []byte
switch {
case i.nextFn != nil:
k, vRaw = i.nextFn()
case len(i.prefix) > 0:
i.seek(ctx)
nextFn = func() ([]byte, []byte) {
select {
case <-ctx.Done():
return nil, nil
case kv := <-i.seekChan:
return kv.k, kv.v
}
}
k, vRaw = nextFn()
case i.counter == 0 && i.descending:
k, vRaw = i.cursor.Last()
nextFn = i.cursor.Prev
case i.counter == 0:
k, vRaw = i.cursor.First()
nextFn = i.cursor.Next
k, vRaw = i.cursor.Seek(i.prefix)
i.nextFn = i.cursor.Next
case i.descending:
k, vRaw = i.cursor.Prev()
nextFn = i.cursor.Prev
k, vRaw = i.cursor.Last()
i.nextFn = i.cursor.Prev
default:
k, vRaw = i.cursor.Next()
nextFn = i.cursor.Next
k, vRaw = i.cursor.First()
i.nextFn = i.cursor.Next
}
k, decodedVal, err := i.decodeFn(k, vRaw)
for ; ; k, decodedVal, err = i.decodeFn(nextFn()) {
for ; ; k, decodedVal, err = i.decodeFn(i.nextFn()) {
if err != nil {
return nil, nil, err
}
@ -585,31 +569,6 @@ func (i *iterator) isNext(k []byte, v interface{}) bool {
return true
}
func (i *iterator) seek(ctx context.Context) {
if i.seekChan != nil || len(i.prefix) == 0 {
return
}
out := make(chan struct{ k, v []byte })
i.seekChan = out
go func() {
defer close(out)
for k, v := i.cursor.Seek(i.prefix); bytes.HasPrefix(k, i.prefix); k, v = i.cursor.Next() {
if len(k) == 0 {
return
}
select {
case <-ctx.Done():
return
case out <- struct{ k, v []byte }{k: k, v: v}:
}
}
}()
}
func IsErrUnexpectedDecodeVal(ok bool) error {
if ok {
return nil

View File

@ -284,9 +284,9 @@ func testFind(t *testing.T, fn func(t *testing.T, suffix string) (storeBase, fun
expectedEnts := []kv.Entity{
newFooEnt(1, 9000, "foo_0"),
newFooEnt(2, 9000, "foo_1"),
newFooEnt(3, 9003, "foo_2"),
newFooEnt(4, 9004, "foo_3"),
newFooEnt(2000, 9000, "foo_1"),
newFooEnt(3000000, 9003, "foo_2"),
newFooEnt(4000000000, 9004, "foo_3"),
}
tests := []struct {
@ -330,6 +330,13 @@ func testFind(t *testing.T, fn func(t *testing.T, suffix string) (storeBase, fun
},
expected: toIfaces(expectedEnts[2]),
},
{
name: "with id prefix",
opts: kv.FindOpts{
Prefix: encodeID(t, 3000000)[:influxdb.IDLength-5],
},
expected: toIfaces(expectedEnts[2], expectedEnts[3]),
},
}
for _, tt := range tests {

View File

@ -76,94 +76,14 @@ func (s *IndexStore) DeleteEnt(ctx context.Context, tx Tx, ent Entity) error {
}
// Find provides a mechanism for looking through the bucket via
// the set options. When a prefix is provided, the prefix is used with
// the index, and not the entity bucket. If you wish to look at the entity
// bucket and seek, then nest into the EntStore here and do that instead.
// the set options. When a prefix is provided, it will be used within
// the entity store. If you would like to search the index store, then
// you can by calling the index store directly.
func (s *IndexStore) Find(ctx context.Context, tx Tx, opts FindOpts) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
if len(opts.Prefix) == 0 {
return s.EntStore.Find(ctx, tx, opts)
}
entCaptureFn := opts.CaptureFn
kvStream, filterFn := s.indexFilterStream(ctx, tx, opts.FilterEntFn)
opts.FilterEntFn = filterFn
opts.CaptureFn = func(key []byte, indexVal interface{}) error {
select {
case <-ctx.Done():
return ctx.Err()
case fromFilter, ok := <-kvStream:
if !ok {
return nil
}
if fromFilter.err != nil {
return fromFilter.err
}
return entCaptureFn(fromFilter.k, fromFilter.v)
}
}
return s.IndexStore.Find(ctx, tx, opts)
}
func (s *IndexStore) indexFilterStream(ctx context.Context, tx Tx, entFilterFn FilterFn) (<-chan struct {
k []byte
v interface{}
err error
}, func([]byte, interface{}) bool) {
kvStream := make(chan struct {
k []byte
v interface{}
err error
}, 1)
type kve struct {
k []byte
v interface{}
err error
}
send := func(key []byte, v interface{}, err error) bool {
select {
case <-ctx.Done():
case kvStream <- kve{k: key, v: v, err: err}:
}
return true
}
return kvStream, func(key []byte, indexVal interface{}) (isValid bool) {
defer func() {
if !isValid {
close(kvStream)
}
}()
ent, err := s.IndexStore.ConvertValToEntFn(key, indexVal)
if err != nil {
return send(nil, nil, err)
}
entVal, err := s.EntStore.FindEnt(ctx, tx, ent)
if err != nil {
return send(nil, nil, err)
}
entKey, err := s.EntStore.EntKey(ctx, ent)
if err != nil {
return send(nil, nil, err)
}
if entFilterFn == nil {
return send(entKey, entVal, nil)
}
if matches := entFilterFn(entKey, entVal); matches {
return send(entKey, entVal, nil)
}
return false
}
return s.EntStore.Find(ctx, tx, opts)
}
// FindEnt returns the decoded entity body via teh provided entity.

View File

@ -258,36 +258,5 @@ func TestIndexStore(t *testing.T) {
}
assert.Equal(t, expected, actuals)
})
t.Run("lookup via orgID", func(t *testing.T) {
base, done, kvStore := newFooIndexStore(t, "find_index_search")
defer done()
expectedEnts := []kv.Entity{
newFooEnt(1, 9000, "foo_0"),
newFooEnt(2, 9001, "foo_1"),
newFooEnt(3, 9003, "foo_2"),
newFooEnt(4, 9003, "foo_3"),
}
seedEnts(t, kvStore, base, expectedEnts...)
var actuals []interface{}
view(t, kvStore, func(tx kv.Tx) error {
return base.Find(context.TODO(), tx, kv.FindOpts{
Prefix: encodeID(t, 9003),
CaptureFn: func(key []byte, decodedVal interface{}) error {
actuals = append(actuals, decodedVal)
return nil
},
})
})
expected := []interface{}{
expectedEnts[2].Body,
expectedEnts[3].Body,
}
assert.Equal(t, expected, actuals)
})
})
}