248 lines
5.2 KiB
Go
248 lines
5.2 KiB
Go
package inmem_test
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"os"
|
|
"reflect"
|
|
"sort"
|
|
"testing"
|
|
|
|
"github.com/google/go-cmp/cmp"
|
|
"github.com/influxdata/influxdb/v2/inmem"
|
|
"github.com/influxdata/influxdb/v2/kv"
|
|
"github.com/influxdata/influxdb/v2/kv/migration"
|
|
platformtesting "github.com/influxdata/influxdb/v2/testing"
|
|
)
|
|
|
|
func initKVStore(f platformtesting.KVStoreFields, t *testing.T) (kv.Store, func()) {
|
|
s := inmem.NewKVStore()
|
|
|
|
mustCreateBucket(t, s, f.Bucket)
|
|
|
|
err := s.Update(context.Background(), func(tx kv.Tx) error {
|
|
b, err := tx.Bucket(f.Bucket)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, p := range f.Pairs {
|
|
if err := b.Put(p.Key, p.Value); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to put keys: %v", err)
|
|
}
|
|
return s, func() {}
|
|
}
|
|
|
|
func TestKVStore(t *testing.T) {
|
|
platformtesting.KVStore(initKVStore, t)
|
|
}
|
|
|
|
func TestKVStore_Buckets(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
buckets []string
|
|
want [][]byte
|
|
}{
|
|
{
|
|
name: "single bucket is returned if only one bucket is added",
|
|
buckets: []string{"b1"},
|
|
want: [][]byte{[]byte("b1")},
|
|
},
|
|
{
|
|
name: "multiple buckets are returned if multiple buckets added",
|
|
buckets: []string{"b1", "b2", "b3"},
|
|
want: [][]byte{[]byte("b1"), []byte("b2"), []byte("b3")},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
s := inmem.NewKVStore()
|
|
for _, b := range tt.buckets {
|
|
mustCreateBucket(t, s, []byte(b))
|
|
}
|
|
|
|
got := s.Buckets(context.Background())
|
|
sort.Slice(got, func(i, j int) bool {
|
|
return string(got[i]) < string(got[j])
|
|
})
|
|
if !reflect.DeepEqual(got, tt.want) {
|
|
t.Errorf("KVStore.Buckets() = %v, want %v", got, tt.want)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestKVStore_Bucket_CursorHintPredicate(t *testing.T) {
|
|
s := inmem.NewKVStore()
|
|
bucket := "urm"
|
|
mustCreateBucket(t, s, []byte(bucket))
|
|
|
|
fillBucket(t, s, bucket, 10)
|
|
|
|
t.Run("filter by key", func(t *testing.T) {
|
|
_ = s.View(context.Background(), func(tx kv.Tx) error {
|
|
b, err := tx.Bucket([]byte(bucket))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cur, _ := b.Cursor(kv.WithCursorHintPredicate(func(key, _ []byte) bool {
|
|
return len(key) < 32 || string(key[16:]) == "8d5dc900004589c3"
|
|
}))
|
|
|
|
count := 0
|
|
for k, _ := cur.First(); len(k) > 0; k, _ = cur.Next() {
|
|
count++
|
|
}
|
|
|
|
if exp, got := 1, count; got != exp {
|
|
t.Errorf("unexpected number of keys, -got/+exp\n%s", cmp.Diff(got, exp))
|
|
}
|
|
|
|
return nil
|
|
})
|
|
})
|
|
|
|
t.Run("filter by value", func(t *testing.T) {
|
|
_ = s.View(context.Background(), func(tx kv.Tx) error {
|
|
b, err := tx.Bucket([]byte(bucket))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cur, _ := b.Cursor(kv.WithCursorHintPredicate(func(_, val []byte) bool {
|
|
return len(val) < 32 || string(val[16:]) == "8d5dc900004589c3"
|
|
}))
|
|
|
|
count := 0
|
|
for k, _ := cur.First(); len(k) > 0; k, _ = cur.Next() {
|
|
count++
|
|
}
|
|
|
|
if exp, got := 1, count; got != exp {
|
|
t.Errorf("unexpected number of keys, -got/+exp\n%s", cmp.Diff(got, exp))
|
|
}
|
|
|
|
return nil
|
|
})
|
|
})
|
|
}
|
|
|
|
func openCursor(t testing.TB, s *inmem.KVStore, bucket string, fn func(cur kv.Cursor), hints ...kv.CursorHint) {
|
|
t.Helper()
|
|
|
|
_ = s.View(context.Background(), func(tx kv.Tx) error {
|
|
b, err := tx.Bucket([]byte(bucket))
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
|
|
cur, err := b.Cursor(hints...)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
|
|
if fn != nil {
|
|
fn(cur)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func BenchmarkKVStore_Bucket_Cursor(b *testing.B) {
|
|
scanAll := func(cur kv.Cursor) {
|
|
for k, v := cur.First(); k != nil; k, v = cur.Next() {
|
|
_, _ = k, v
|
|
}
|
|
}
|
|
|
|
searchKey := "629ffa00003dd2ce"
|
|
predicate := kv.CursorPredicateFunc(func(key, _ []byte) bool {
|
|
return len(key) < 32 || string(key[16:]) == searchKey
|
|
})
|
|
|
|
b.Run("16000 keys", func(b *testing.B) {
|
|
s := inmem.NewKVStore()
|
|
bucket := "urm"
|
|
mustCreateBucket(b, s, []byte(bucket))
|
|
fillBucket(b, s, bucket, 0)
|
|
|
|
b.Run("without hint", func(b *testing.B) {
|
|
b.ResetTimer()
|
|
b.ReportAllocs()
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
openCursor(b, s, bucket, scanAll)
|
|
}
|
|
})
|
|
|
|
b.Run("with hint", func(b *testing.B) {
|
|
b.ResetTimer()
|
|
b.ReportAllocs()
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
openCursor(b, s, bucket, scanAll, kv.WithCursorHintPredicate(predicate))
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
const sourceFile = "kvdata/keys.txt"
|
|
|
|
func fillBucket(t testing.TB, s *inmem.KVStore, bucket string, lines int64) {
|
|
t.Helper()
|
|
err := s.Update(context.Background(), func(tx kv.Tx) error {
|
|
b, err := tx.Bucket([]byte(bucket))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
f, err := os.Open(sourceFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
if lines == 0 {
|
|
lines = int64(math.MaxInt64)
|
|
}
|
|
|
|
scan := bufio.NewScanner(bufio.NewReader(f))
|
|
for scan.Scan() {
|
|
var key []byte
|
|
key = append(key, scan.Bytes()...)
|
|
_ = b.Put(key, key)
|
|
lines--
|
|
if lines <= 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
}
|
|
|
|
func mustCreateBucket(t testing.TB, store kv.SchemaStore, bucket []byte) {
|
|
t.Helper()
|
|
|
|
migrationName := fmt.Sprintf("create bucket %q", string(bucket))
|
|
|
|
if err := migration.CreateBuckets(migrationName, bucket).Up(context.Background(), store); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|