influxdb/inmem/kv.go

474 lines
9.2 KiB
Go

package inmem
import (
"bytes"
"context"
"fmt"
"io"
"sync"
"github.com/google/btree"
"github.com/influxdata/influxdb/v2/kv"
)
// ensure *KVStore implement kv.SchemaStore interface
var _ kv.SchemaStore = (*KVStore)(nil)
// cursorBatchSize is the size of a batch sent by a forward cursors
// tree iterator
const cursorBatchSize = 1000
// KVStore is an in memory btree backed kv.Store.
type KVStore struct {
mu sync.RWMutex
buckets map[string]*Bucket
ro map[string]*bucket
}
// NewKVStore creates an instance of a KVStore.
func NewKVStore() *KVStore {
return &KVStore{
buckets: map[string]*Bucket{},
ro: map[string]*bucket{},
}
}
// View opens up a transaction with a read lock.
func (s *KVStore) View(ctx context.Context, fn func(kv.Tx) error) error {
s.mu.RLock()
defer s.mu.RUnlock()
return fn(&Tx{
kv: s,
writable: false,
ctx: ctx,
})
}
// Update opens up a transaction with a write lock.
func (s *KVStore) Update(ctx context.Context, fn func(kv.Tx) error) error {
s.mu.Lock()
defer s.mu.Unlock()
return fn(&Tx{
kv: s,
writable: true,
ctx: ctx,
})
}
// CreateBucket creates a bucket with the provided name if one
// does not exist.
func (s *KVStore) CreateBucket(ctx context.Context, name []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.buckets[string(name)]
if !ok {
bkt := &Bucket{btree: btree.New(2)}
s.buckets[string(name)] = bkt
s.ro[string(name)] = &bucket{Bucket: bkt}
}
return nil
}
// DeleteBucket creates a bucket with the provided name if one
// does not exist.
func (s *KVStore) DeleteBucket(ctx context.Context, name []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.buckets, string(name))
return nil
}
func (s *KVStore) RLock() {
s.mu.RLock()
}
func (s *KVStore) RUnlock() {
s.mu.RUnlock()
}
func (s *KVStore) Backup(ctx context.Context, w io.Writer) error {
panic("not implemented")
}
func (s *KVStore) Restore(ctx context.Context, r io.Reader) error {
panic("not implemented")
}
// Flush removes all data from the buckets. Used for testing.
func (s *KVStore) Flush(ctx context.Context) {
s.mu.Lock()
defer s.mu.Unlock()
for _, b := range s.buckets {
b.btree.Clear(false)
}
}
// Buckets returns the names of all buckets within inmem.KVStore.
func (s *KVStore) Buckets(ctx context.Context) [][]byte {
s.mu.RLock()
defer s.mu.RUnlock()
buckets := make([][]byte, 0, len(s.buckets))
for b := range s.buckets {
buckets = append(buckets, []byte(b))
}
return buckets
}
// Tx is an in memory transaction.
// TODO: make transactions actually transactional
type Tx struct {
kv *KVStore
writable bool
ctx context.Context
}
// Context returns the context for the transaction.
func (t *Tx) Context() context.Context {
return t.ctx
}
// WithContext sets the context for the transaction.
func (t *Tx) WithContext(ctx context.Context) {
t.ctx = ctx
}
// Bucket retrieves the bucket at the provided key.
func (t *Tx) Bucket(b []byte) (kv.Bucket, error) {
bkt, ok := t.kv.buckets[string(b)]
if !ok {
return nil, fmt.Errorf("bucket %q: %w", string(b), kv.ErrBucketNotFound)
}
if t.writable {
return bkt, nil
}
return t.kv.ro[string(b)], nil
}
// Bucket is a btree that implements kv.Bucket.
type Bucket struct {
mu sync.RWMutex
btree *btree.BTree
}
type bucket struct {
kv.Bucket
}
// Put wraps the put method of a kv bucket and ensures that the
// bucket is writable.
func (b *bucket) Put(_, _ []byte) error {
return kv.ErrTxNotWritable
}
// Delete wraps the delete method of a kv bucket and ensures that the
// bucket is writable.
func (b *bucket) Delete(_ []byte) error {
return kv.ErrTxNotWritable
}
type item struct {
key []byte
value []byte
}
// Less is used to implement btree.Item.
func (i *item) Less(b btree.Item) bool {
j, ok := b.(*item)
if !ok {
return false
}
return bytes.Compare(i.key, j.key) < 0
}
// Get retrieves the value at the provided key.
func (b *Bucket) Get(key []byte) ([]byte, error) {
b.mu.RLock()
defer b.mu.RUnlock()
i := b.btree.Get(&item{key: key})
if i == nil {
return nil, kv.ErrKeyNotFound
}
j, ok := i.(*item)
if !ok {
return nil, fmt.Errorf("error item is type %T not *item", i)
}
return j.value, nil
}
// Get retrieves a batch of values for the provided keys.
func (b *Bucket) GetBatch(keys ...[]byte) ([][]byte, error) {
b.mu.RLock()
defer b.mu.RUnlock()
values := make([][]byte, len(keys))
for idx, key := range keys {
i := b.btree.Get(&item{key: key})
if i == nil {
// leave value as nil slice
continue
}
j, ok := i.(*item)
if !ok {
return nil, fmt.Errorf("error item is type %T not *item", i)
}
values[idx] = j.value
}
return values, nil
}
// Put sets the key value pair provided.
func (b *Bucket) Put(key []byte, value []byte) error {
b.mu.Lock()
defer b.mu.Unlock()
_ = b.btree.ReplaceOrInsert(&item{key: key, value: value})
return nil
}
// Delete removes the key provided.
func (b *Bucket) Delete(key []byte) error {
b.mu.Lock()
defer b.mu.Unlock()
_ = b.btree.Delete(&item{key: key})
return nil
}
// Cursor creates a static cursor from all entries in the database.
func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) {
var o kv.CursorHints
for _, opt := range opts {
opt(&o)
}
// TODO we should do this by using the Ascend/Descend methods that
// the btree provides.
pairs, err := b.getAll(&o)
if err != nil {
return nil, err
}
return kv.NewStaticCursor(pairs), nil
}
func (b *Bucket) getAll(o *kv.CursorHints) ([]kv.Pair, error) {
b.mu.RLock()
defer b.mu.RUnlock()
fn := o.PredicateFn
var pairs []kv.Pair
var err error
b.btree.Ascend(func(i btree.Item) bool {
j, ok := i.(*item)
if !ok {
err = fmt.Errorf("error item is type %T not *item", i)
return false
}
if fn == nil || fn(j.key, j.value) {
pairs = append(pairs, kv.Pair{Key: j.key, Value: j.value})
}
return true
})
if err != nil {
return nil, err
}
return pairs, nil
}
type pair struct {
kv.Pair
err error
}
// ForwardCursor returns a directional cursor which starts at the provided seeked key
func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) {
config := kv.NewCursorConfig(opts...)
if config.Prefix != nil && !bytes.HasPrefix(seek, config.Prefix) {
return nil, fmt.Errorf("seek bytes %q not prefixed with %q: %w", string(seek), string(config.Prefix), kv.ErrSeekMissingPrefix)
}
var (
pairs = make(chan []pair)
stop = make(chan struct{})
send = func(batch []pair) bool {
if len(batch) == 0 {
return true
}
select {
case pairs <- batch:
return true
case <-stop:
return false
}
}
)
go func() {
defer close(pairs)
var (
batch []pair
fn = config.Hints.PredicateFn
iterate = b.ascend
skipFirst = config.SkipFirst
seen int
)
if config.Direction == kv.CursorDescending {
iterate = b.descend
if len(seek) == 0 {
if item, ok := b.btree.Max().(*item); ok {
seek = item.key
}
}
}
b.mu.RLock()
iterate(seek, config, func(i btree.Item) bool {
select {
case <-stop:
// if signalled to stop then exit iteration
return false
default:
}
// if skip first
if skipFirst {
skipFirst = false
return true
}
// enforce limit
if config.Limit != nil && seen >= *config.Limit {
return false
}
j, ok := i.(*item)
if !ok {
batch = append(batch, pair{err: fmt.Errorf("error item is type %T not *item", i)})
return false
}
if config.Prefix != nil && !bytes.HasPrefix(j.key, config.Prefix) {
return false
}
if fn == nil || fn(j.key, j.value) {
batch = append(batch, pair{Pair: kv.Pair{Key: j.key, Value: j.value}})
seen++
}
if len(batch) < cursorBatchSize {
return true
}
if send(batch) {
// batch flushed successfully so we can
// begin a new batch
batch = nil
return true
}
// we've been signalled to stop
return false
})
b.mu.RUnlock()
// send if any left in batch
send(batch)
}()
return &ForwardCursor{pairs: pairs, stop: stop}, nil
}
func (b *Bucket) ascend(seek []byte, config kv.CursorConfig, it btree.ItemIterator) {
b.btree.AscendGreaterOrEqual(&item{key: seek}, it)
}
func (b *Bucket) descend(seek []byte, config kv.CursorConfig, it btree.ItemIterator) {
b.btree.DescendLessOrEqual(&item{key: seek}, it)
}
// ForwardCursor is a kv.ForwardCursor which iterates over an in-memory btree
type ForwardCursor struct {
pairs <-chan []pair
cur []pair
n int
stop chan struct{}
closed bool
// error found during iteration
err error
}
// Err returns a non-nil error when an error occurred during cursor iteration.
func (c *ForwardCursor) Err() error {
return c.err
}
// Close releases the producing goroutines for the forward cursor.
// It blocks until the producing goroutine exits.
func (c *ForwardCursor) Close() error {
if c.closed {
return nil
}
close(c.stop)
c.closed = true
return nil
}
// Next returns the next key/value pair in the cursor
func (c *ForwardCursor) Next() ([]byte, []byte) {
if c.err != nil || c.closed {
return nil, nil
}
if c.n >= len(c.cur) {
var ok bool
c.cur, ok = <-c.pairs
if !ok {
return nil, nil
}
c.n = 0
}
pair := c.cur[c.n]
c.err = pair.err
c.n++
return pair.Key, pair.Value
}