diff --git a/inmem/kv.go b/inmem/kv.go index 4697c989b0..e853b8982a 100644 --- a/inmem/kv.go +++ b/inmem/kv.go @@ -105,7 +105,7 @@ func (t *Tx) createBucketIfNotExists(b []byte) (kv.Bucket, error) { if t.writable { bkt, ok := t.kv.buckets[string(b)] if !ok { - bkt = &Bucket{btree.New(2)} + bkt = &Bucket{btree: btree.New(2)} t.kv.buckets[string(b)] = bkt t.kv.ro[string(b)] = &bucket{Bucket: bkt} return bkt, nil @@ -133,6 +133,7 @@ func (t *Tx) Bucket(b []byte) (kv.Bucket, error) { // Bucket is a btree that implements kv.Bucket. type Bucket struct { + mu sync.RWMutex btree *btree.BTree } @@ -169,6 +170,9 @@ func (i *item) Less(b btree.Item) bool { // Get retrieves the value at the provided key. func (b *Bucket) Get(key []byte) ([]byte, error) { + b.mu.RLock() + defer b.mu.RUnlock() + i := b.btree.Get(&item{key: key}) if i == nil { @@ -185,12 +189,18 @@ func (b *Bucket) Get(key []byte) ([]byte, error) { // Put sets the key value pair provided. func (b *Bucket) Put(key []byte, value []byte) error { + b.mu.Lock() + defer b.mu.Unlock() + _ = b.btree.ReplaceOrInsert(&item{key: key, value: value}) return nil } // Delete removes the key provided. func (b *Bucket) Delete(key []byte) error { + b.mu.Lock() + defer b.mu.Unlock() + _ = b.btree.Delete(&item{key: key}) return nil } @@ -213,6 +223,9 @@ func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) { } func (b *Bucket) getAll(o *kv.CursorHints) ([]kv.Pair, error) { + b.mu.RLock() + defer b.mu.RUnlock() + fn := o.PredicateFn var pairs []kv.Pair @@ -268,6 +281,7 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward ) go func() { + defer close(pairs) var ( @@ -284,7 +298,9 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward } } + b.mu.RLock() iterate(seek, config, func(i btree.Item) bool { + select { case <-stop: // if signalled to stop then exit iteration @@ -322,6 +338,7 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward // we've been signalled to stop return false }) + b.mu.RUnlock() // send if any left in batch send(batch) @@ -359,6 +376,7 @@ func (c *ForwardCursor) Err() error { // Close releases the producing goroutines for the forward cursor. // It blocks until the producing goroutine exits. func (c *ForwardCursor) Close() error { + if c.closed { return nil } diff --git a/kv/scrapers.go b/kv/scrapers.go index 05d22e8fe4..13f2b5b2e8 100644 --- a/kv/scrapers.go +++ b/kv/scrapers.go @@ -113,12 +113,27 @@ func (s *Service) listTargets(ctx context.Context, tx Tx, filter influxdb.Scrape return nil, err } - cur, err := bucket.Cursor() + cur, err := bucket.ForwardCursor(nil) if err != nil { return nil, UnexpectedScrapersBucketError(err) } - for k, v := cur.First(); k != nil; k, v = cur.Next() { + var org *influxdb.Organization + if filter.Org != nil { + org, err = s.findOrganizationByName(ctx, tx, *filter.Org) + if err != nil { + return nil, err + } + } + + if filter.OrgID != nil { + org, err = s.findOrganizationByID(ctx, tx, *filter.OrgID) + if err != nil { + return nil, err + } + } + + for k, v := cur.Next(); k != nil; k, v = cur.Next() { target, err := unmarshalScraper(v) if err != nil { return nil, err @@ -131,24 +146,11 @@ func (s *Service) listTargets(ctx context.Context, tx Tx, filter influxdb.Scrape if filter.Name != nil && target.Name != *filter.Name { continue } - if filter.Org != nil { - o, err := s.findOrganizationByName(ctx, tx, *filter.Org) - if err != nil { - return nil, err - } - if target.OrgID != o.ID { - continue - } - } - if filter.OrgID != nil { - o, err := s.findOrganizationByID(ctx, tx, *filter.OrgID) - if err != nil { - return nil, err - } - if target.OrgID != o.ID { - continue - } + + if org != nil && target.OrgID != org.ID { + continue } + targets = append(targets, *target) } return targets, nil