fix(kv): Update scrapers to use new forward cursor (#16647)

* fix(kv): Update scrapers to use new forward cursor

I also made a minor update to move a db lookup outside of a for loop to save
time and optimize

* fix(inmem): fix a potential race condition
pull/16671/head
Lyon Hill 2020-01-24 13:35:47 -07:00 committed by GitHub
parent 7c2980d9ea
commit cb7f053d45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 20 deletions

View File

@ -105,7 +105,7 @@ func (t *Tx) createBucketIfNotExists(b []byte) (kv.Bucket, error) {
if t.writable {
bkt, ok := t.kv.buckets[string(b)]
if !ok {
bkt = &Bucket{btree.New(2)}
bkt = &Bucket{btree: btree.New(2)}
t.kv.buckets[string(b)] = bkt
t.kv.ro[string(b)] = &bucket{Bucket: bkt}
return bkt, nil
@ -133,6 +133,7 @@ func (t *Tx) Bucket(b []byte) (kv.Bucket, error) {
// Bucket is a btree that implements kv.Bucket.
type Bucket struct {
mu sync.RWMutex
btree *btree.BTree
}
@ -169,6 +170,9 @@ func (i *item) Less(b btree.Item) bool {
// Get retrieves the value at the provided key.
func (b *Bucket) Get(key []byte) ([]byte, error) {
b.mu.RLock()
defer b.mu.RUnlock()
i := b.btree.Get(&item{key: key})
if i == nil {
@ -185,12 +189,18 @@ func (b *Bucket) Get(key []byte) ([]byte, error) {
// Put sets the key value pair provided.
func (b *Bucket) Put(key []byte, value []byte) error {
b.mu.Lock()
defer b.mu.Unlock()
_ = b.btree.ReplaceOrInsert(&item{key: key, value: value})
return nil
}
// Delete removes the key provided.
func (b *Bucket) Delete(key []byte) error {
b.mu.Lock()
defer b.mu.Unlock()
_ = b.btree.Delete(&item{key: key})
return nil
}
@ -213,6 +223,9 @@ func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) {
}
func (b *Bucket) getAll(o *kv.CursorHints) ([]kv.Pair, error) {
b.mu.RLock()
defer b.mu.RUnlock()
fn := o.PredicateFn
var pairs []kv.Pair
@ -268,6 +281,7 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
)
go func() {
defer close(pairs)
var (
@ -284,7 +298,9 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
}
}
b.mu.RLock()
iterate(seek, config, func(i btree.Item) bool {
select {
case <-stop:
// if signalled to stop then exit iteration
@ -322,6 +338,7 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
// we've been signalled to stop
return false
})
b.mu.RUnlock()
// send if any left in batch
send(batch)
@ -359,6 +376,7 @@ func (c *ForwardCursor) Err() error {
// Close releases the producing goroutines for the forward cursor.
// It blocks until the producing goroutine exits.
func (c *ForwardCursor) Close() error {
if c.closed {
return nil
}

View File

@ -113,12 +113,27 @@ func (s *Service) listTargets(ctx context.Context, tx Tx, filter influxdb.Scrape
return nil, err
}
cur, err := bucket.Cursor()
cur, err := bucket.ForwardCursor(nil)
if err != nil {
return nil, UnexpectedScrapersBucketError(err)
}
for k, v := cur.First(); k != nil; k, v = cur.Next() {
var org *influxdb.Organization
if filter.Org != nil {
org, err = s.findOrganizationByName(ctx, tx, *filter.Org)
if err != nil {
return nil, err
}
}
if filter.OrgID != nil {
org, err = s.findOrganizationByID(ctx, tx, *filter.OrgID)
if err != nil {
return nil, err
}
}
for k, v := cur.Next(); k != nil; k, v = cur.Next() {
target, err := unmarshalScraper(v)
if err != nil {
return nil, err
@ -131,24 +146,11 @@ func (s *Service) listTargets(ctx context.Context, tx Tx, filter influxdb.Scrape
if filter.Name != nil && target.Name != *filter.Name {
continue
}
if filter.Org != nil {
o, err := s.findOrganizationByName(ctx, tx, *filter.Org)
if err != nil {
return nil, err
}
if target.OrgID != o.ID {
continue
}
}
if filter.OrgID != nil {
o, err := s.findOrganizationByID(ctx, tx, *filter.OrgID)
if err != nil {
return nil, err
}
if target.OrgID != o.ID {
continue
}
if org != nil && target.OrgID != org.ID {
continue
}
targets = append(targets, *target)
}
return targets, nil