feat(kv): add cursor option skip first item (#16758)
parent
b2da311aa2
commit
fe990aa4db
14
bolt/kv.go
14
bolt/kv.go
|
@ -226,12 +226,18 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
|
|||
return nil, fmt.Errorf("seek bytes %q not prefixed with %q: %w", string(seek), string(config.Prefix), kv.ErrSeekMissingPrefix)
|
||||
}
|
||||
|
||||
return &Cursor{
|
||||
c := &Cursor{
|
||||
cursor: cursor,
|
||||
key: key,
|
||||
value: value,
|
||||
config: config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// only remember first seeked item if not skipped
|
||||
if !config.SkipFirst {
|
||||
c.key = key
|
||||
c.value = value
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Cursor retrieves a cursor for iterating through the entries
|
||||
|
|
15
inmem/kv.go
15
inmem/kv.go
|
@ -281,13 +281,13 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
|
|||
)
|
||||
|
||||
go func() {
|
||||
|
||||
defer close(pairs)
|
||||
|
||||
var (
|
||||
batch []pair
|
||||
fn = config.Hints.PredicateFn
|
||||
iterate = b.ascend
|
||||
batch []pair
|
||||
fn = config.Hints.PredicateFn
|
||||
iterate = b.ascend
|
||||
skipFirst = config.SkipFirst
|
||||
)
|
||||
|
||||
if config.Direction == kv.CursorDescending {
|
||||
|
@ -300,7 +300,6 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
|
|||
|
||||
b.mu.RLock()
|
||||
iterate(seek, config, func(i btree.Item) bool {
|
||||
|
||||
select {
|
||||
case <-stop:
|
||||
// if signalled to stop then exit iteration
|
||||
|
@ -308,6 +307,12 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
|
|||
default:
|
||||
}
|
||||
|
||||
// if skip first
|
||||
if skipFirst {
|
||||
skipFirst = false
|
||||
return true
|
||||
}
|
||||
|
||||
j, ok := i.(*item)
|
||||
if !ok {
|
||||
batch = append(batch, pair{err: fmt.Errorf("error item is type %T not *item", i)})
|
||||
|
|
|
@ -148,6 +148,7 @@ type CursorConfig struct {
|
|||
Direction CursorDirection
|
||||
Hints CursorHints
|
||||
Prefix []byte
|
||||
SkipFirst bool
|
||||
}
|
||||
|
||||
// NewCursorConfig constructs and configures a CursorConfig used to configure
|
||||
|
@ -190,3 +191,11 @@ func WithCursorPrefix(prefix []byte) CursorOption {
|
|||
c.Prefix = prefix
|
||||
}
|
||||
}
|
||||
|
||||
// WithCursorSkipFirstItem skips returning the first item found within
|
||||
// the seek.
|
||||
func WithCursorSkipFirstItem() CursorOption {
|
||||
return func(c *CursorConfig) {
|
||||
c.SkipFirst = true
|
||||
}
|
||||
}
|
||||
|
|
24
kv/task.go
24
kv/task.go
|
@ -367,7 +367,10 @@ func (s *Service) findTasksByOrg(ctx context.Context, tx Tx, filter influxdb.Tas
|
|||
return nil, 0, influxdb.ErrInvalidTaskID
|
||||
}
|
||||
|
||||
key := prefix
|
||||
var (
|
||||
key = prefix
|
||||
opts []CursorOption
|
||||
)
|
||||
// we can filter by orgID
|
||||
if filter.After != nil {
|
||||
key, err = taskOrgKey(org.ID, *filter.After)
|
||||
|
@ -375,11 +378,13 @@ func (s *Service) findTasksByOrg(ctx context.Context, tx Tx, filter influxdb.Tas
|
|||
return nil, 0, err
|
||||
}
|
||||
|
||||
// append a extra character because we want to skip "after" record
|
||||
key = []byte(string(key) + "\x00")
|
||||
opts = append(opts, WithCursorSkipFirstItem())
|
||||
}
|
||||
|
||||
c, err := indexBucket.ForwardCursor(key, WithCursorPrefix(prefix))
|
||||
c, err := indexBucket.ForwardCursor(
|
||||
key,
|
||||
append(opts, WithCursorPrefix(prefix))...,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||
}
|
||||
|
@ -476,18 +481,21 @@ func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskF
|
|||
return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||
}
|
||||
|
||||
var seek []byte
|
||||
var (
|
||||
seek []byte
|
||||
opts []CursorOption
|
||||
)
|
||||
|
||||
if filter.After != nil {
|
||||
seek, err = taskKey(*filter.After)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// append a extra character because we want to skip "after" record
|
||||
seek = []byte(string(seek) + "\x00")
|
||||
opts = append(opts, WithCursorSkipFirstItem())
|
||||
}
|
||||
|
||||
c, err := taskBucket.ForwardCursor(seek)
|
||||
c, err := taskBucket.ForwardCursor(seek, opts...)
|
||||
if err != nil {
|
||||
return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||
}
|
||||
|
|
|
@ -737,6 +737,25 @@ func KVForwardCursor(
|
|||
},
|
||||
exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"},
|
||||
},
|
||||
{
|
||||
name: "prefix - skip first",
|
||||
fields: KVStoreFields{
|
||||
Bucket: []byte("bucket"),
|
||||
Pairs: pairs(
|
||||
"aa/00", "aa/01",
|
||||
"aaa/00", "aaa/01", "aaa/02", "aaa/03",
|
||||
"bbb/00", "bbb/01", "bbb/02"),
|
||||
},
|
||||
args: args{
|
||||
seek: "aaa/00",
|
||||
until: "bbb/02",
|
||||
opts: []kv.CursorOption{
|
||||
kv.WithCursorPrefix([]byte("aaa")),
|
||||
kv.WithCursorSkipFirstItem(),
|
||||
},
|
||||
},
|
||||
exp: []string{"aaa/01", "aaa/02", "aaa/03"},
|
||||
},
|
||||
{
|
||||
name: "prefix - does not prefix seek",
|
||||
fields: KVStoreFields{
|
||||
|
|
Loading…
Reference in New Issue