influxdb/inmem/kv_test.go

248 lines
5.2 KiB
Go

package inmem_test
import (
"bufio"
"context"
"fmt"
"math"
"os"
"reflect"
"sort"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration"
platformtesting "github.com/influxdata/influxdb/v2/testing"
)
func initKVStore(f platformtesting.KVStoreFields, t *testing.T) (kv.Store, func()) {
s := inmem.NewKVStore()
mustCreateBucket(t, s, f.Bucket)
err := s.Update(context.Background(), func(tx kv.Tx) error {
b, err := tx.Bucket(f.Bucket)
if err != nil {
return err
}
for _, p := range f.Pairs {
if err := b.Put(p.Key, p.Value); err != nil {
return err
}
}
return nil
})
if err != nil {
t.Fatalf("failed to put keys: %v", err)
}
return s, func() {}
}
func TestKVStore(t *testing.T) {
platformtesting.KVStore(initKVStore, t)
}
func TestKVStore_Buckets(t *testing.T) {
tests := []struct {
name string
buckets []string
want [][]byte
}{
{
name: "single bucket is returned if only one bucket is added",
buckets: []string{"b1"},
want: [][]byte{[]byte("b1")},
},
{
name: "multiple buckets are returned if multiple buckets added",
buckets: []string{"b1", "b2", "b3"},
want: [][]byte{[]byte("b1"), []byte("b2"), []byte("b3")},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := inmem.NewKVStore()
for _, b := range tt.buckets {
mustCreateBucket(t, s, []byte(b))
}
got := s.Buckets(context.Background())
sort.Slice(got, func(i, j int) bool {
return string(got[i]) < string(got[j])
})
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("KVStore.Buckets() = %v, want %v", got, tt.want)
}
})
}
}
func TestKVStore_Bucket_CursorHintPredicate(t *testing.T) {
s := inmem.NewKVStore()
bucket := "urm"
mustCreateBucket(t, s, []byte(bucket))
fillBucket(t, s, bucket, 10)
t.Run("filter by key", func(t *testing.T) {
_ = s.View(context.Background(), func(tx kv.Tx) error {
b, err := tx.Bucket([]byte(bucket))
if err != nil {
return err
}
cur, _ := b.Cursor(kv.WithCursorHintPredicate(func(key, _ []byte) bool {
return len(key) < 32 || string(key[16:]) == "8d5dc900004589c3"
}))
count := 0
for k, _ := cur.First(); len(k) > 0; k, _ = cur.Next() {
count++
}
if exp, got := 1, count; got != exp {
t.Errorf("unexpected number of keys, -got/+exp\n%s", cmp.Diff(got, exp))
}
return nil
})
})
t.Run("filter by value", func(t *testing.T) {
_ = s.View(context.Background(), func(tx kv.Tx) error {
b, err := tx.Bucket([]byte(bucket))
if err != nil {
return err
}
cur, _ := b.Cursor(kv.WithCursorHintPredicate(func(_, val []byte) bool {
return len(val) < 32 || string(val[16:]) == "8d5dc900004589c3"
}))
count := 0
for k, _ := cur.First(); len(k) > 0; k, _ = cur.Next() {
count++
}
if exp, got := 1, count; got != exp {
t.Errorf("unexpected number of keys, -got/+exp\n%s", cmp.Diff(got, exp))
}
return nil
})
})
}
func openCursor(t testing.TB, s *inmem.KVStore, bucket string, fn func(cur kv.Cursor), hints ...kv.CursorHint) {
t.Helper()
_ = s.View(context.Background(), func(tx kv.Tx) error {
b, err := tx.Bucket([]byte(bucket))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cur, err := b.Cursor(hints...)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if fn != nil {
fn(cur)
}
return nil
})
}
func BenchmarkKVStore_Bucket_Cursor(b *testing.B) {
scanAll := func(cur kv.Cursor) {
for k, v := cur.First(); k != nil; k, v = cur.Next() {
_, _ = k, v
}
}
searchKey := "629ffa00003dd2ce"
predicate := kv.CursorPredicateFunc(func(key, _ []byte) bool {
return len(key) < 32 || string(key[16:]) == searchKey
})
b.Run("16000 keys", func(b *testing.B) {
s := inmem.NewKVStore()
bucket := "urm"
mustCreateBucket(b, s, []byte(bucket))
fillBucket(b, s, bucket, 0)
b.Run("without hint", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
openCursor(b, s, bucket, scanAll)
}
})
b.Run("with hint", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
openCursor(b, s, bucket, scanAll, kv.WithCursorHintPredicate(predicate))
}
})
})
}
const sourceFile = "kvdata/keys.txt"
func fillBucket(t testing.TB, s *inmem.KVStore, bucket string, lines int64) {
t.Helper()
err := s.Update(context.Background(), func(tx kv.Tx) error {
b, err := tx.Bucket([]byte(bucket))
if err != nil {
return err
}
f, err := os.Open(sourceFile)
if err != nil {
return err
}
defer f.Close()
if lines == 0 {
lines = int64(math.MaxInt64)
}
scan := bufio.NewScanner(bufio.NewReader(f))
for scan.Scan() {
var key []byte
key = append(key, scan.Bytes()...)
_ = b.Put(key, key)
lines--
if lines <= 0 {
break
}
}
return nil
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func mustCreateBucket(t testing.TB, store kv.SchemaStore, bucket []byte) {
t.Helper()
migrationName := fmt.Sprintf("create bucket %q", string(bucket))
if err := migration.CreateBuckets(migrationName, bucket).Up(context.Background(), store); err != nil {
t.Fatal(err)
}
}