package testing import ( "bytes" "context" "fmt" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/influxdata/influxdb/kv" ) // KVStoreFields are background data that has to be set before // the test runs. type KVStoreFields struct { Bucket []byte Pairs []kv.Pair } // KVStore tests the key value store contract func KVStore( init func(KVStoreFields, *testing.T) (kv.Store, func()), t *testing.T, ) { tests := []struct { name string fn func( init func(KVStoreFields, *testing.T) (kv.Store, func()), t *testing.T, ) }{ { name: "Get", fn: KVGet, }, { name: "Put", fn: KVPut, }, { name: "Delete", fn: KVDelete, }, { name: "Cursor", fn: KVCursor, }, { name: "CursorWithHints", fn: KVCursorWithHints, }, { name: "View", fn: KVView, }, { name: "Update", fn: KVUpdate, }, { name: "ConcurrentUpdate", fn: KVConcurrentUpdate, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.fn(init, t) }) } } // KVGet tests the get method contract for the key value store. func KVGet( init func(KVStoreFields, *testing.T) (kv.Store, func()), t *testing.T, ) { type args struct { bucket []byte key []byte } type wants struct { err error val []byte } tests := []struct { name string fields KVStoreFields args args wants wants }{ { name: "get key", fields: KVStoreFields{ Bucket: []byte("bucket"), Pairs: []kv.Pair{ { Key: []byte("hello"), Value: []byte("world"), }, }, }, args: args{ bucket: []byte("bucket"), key: []byte("hello"), }, wants: wants{ val: []byte("world"), }, }, { name: "get missing key", fields: KVStoreFields{ Bucket: []byte("bucket"), Pairs: []kv.Pair{}, }, args: args{ bucket: []byte("bucket"), key: []byte("hello"), }, wants: wants{ err: kv.ErrKeyNotFound, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s, close := init(tt.fields, t) defer close() err := s.View(context.Background(), func(tx kv.Tx) error { b, err := tx.Bucket(tt.args.bucket) if err != nil { t.Errorf("unexpected error retrieving bucket: %v", err) return err } val, err := b.Get(tt.args.key) if (err != nil) != (tt.wants.err != nil) { t.Errorf("expected error '%v' got '%v'", tt.wants.err, err) return err } if err != nil && tt.wants.err != nil { if err.Error() != tt.wants.err.Error() { t.Errorf("expected error messages to match '%v' got '%v'", tt.wants.err, err.Error()) return err } } if want, got := tt.wants.val, val; !bytes.Equal(want, got) { t.Errorf("exptected to get value %s got %s", string(want), string(got)) return err } return nil }) if err != nil { t.Fatalf("error during view transaction: %v", err) } }) } } // KVPut tests the get method contract for the key value store. func KVPut( init func(KVStoreFields, *testing.T) (kv.Store, func()), t *testing.T, ) { type args struct { bucket []byte key []byte val []byte } type wants struct { err error } tests := []struct { name string fields KVStoreFields args args wants wants }{ { name: "put pair", fields: KVStoreFields{ Bucket: []byte("bucket"), Pairs: []kv.Pair{}, }, args: args{ bucket: []byte("bucket"), key: []byte("hello"), val: []byte("world"), }, wants: wants{}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s, close := init(tt.fields, t) defer close() err := s.Update(context.Background(), func(tx kv.Tx) error { b, err := tx.Bucket(tt.args.bucket) if err != nil { t.Errorf("unexpected error retrieving bucket: %v", err) return err } { err := b.Put(tt.args.key, tt.args.val) if (err != nil) != (tt.wants.err != nil) { t.Errorf("expected error '%v' got '%v'", tt.wants.err, err) return err } if err != nil && tt.wants.err != nil { if err.Error() != tt.wants.err.Error() { t.Errorf("expected error messages to match '%v' got '%v'", tt.wants.err, err.Error()) return err } } val, err := b.Get(tt.args.key) if err != nil { t.Errorf("unexpected error retrieving value: %v", err) return err } if want, got := tt.args.val, val; !bytes.Equal(want, got) { t.Errorf("exptected to get value %s got %s", string(want), string(got)) return err } } return nil }) if err != nil { t.Fatalf("error during view transaction: %v", err) } }) } } // KVDelete tests the delete method contract for the key value store. func KVDelete( init func(KVStoreFields, *testing.T) (kv.Store, func()), t *testing.T, ) { type args struct { bucket []byte key []byte } type wants struct { err error } tests := []struct { name string fields KVStoreFields args args wants wants }{ { name: "delete key", fields: KVStoreFields{ Bucket: []byte("bucket"), Pairs: []kv.Pair{ { Key: []byte("hello"), Value: []byte("world"), }, }, }, args: args{ bucket: []byte("bucket"), key: []byte("hello"), }, wants: wants{}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s, close := init(tt.fields, t) defer close() err := s.Update(context.Background(), func(tx kv.Tx) error { b, err := tx.Bucket(tt.args.bucket) if err != nil { t.Errorf("unexpected error retrieving bucket: %v", err) return err } { err := b.Delete(tt.args.key) if (err != nil) != (tt.wants.err != nil) { t.Errorf("expected error '%v' got '%v'", tt.wants.err, err) return err } if err != nil && tt.wants.err != nil { if err.Error() != tt.wants.err.Error() { t.Errorf("expected error messages to match '%v' got '%v'", tt.wants.err, err.Error()) return err } } if _, err := b.Get(tt.args.key); err != kv.ErrKeyNotFound { t.Errorf("expected key not found error got %v", err) return err } } return nil }) if err != nil { t.Fatalf("error during view transaction: %v", err) } }) } } // KVCursor tests the cursor contract for the key value store. func KVCursor( init func(KVStoreFields, *testing.T) (kv.Store, func()), t *testing.T, ) { type args struct { bucket []byte seek []byte } type wants struct { err error first kv.Pair last kv.Pair seek kv.Pair next kv.Pair prev kv.Pair } tests := []struct { name string fields KVStoreFields args args wants wants }{ { name: "basic cursor", fields: KVStoreFields{ Bucket: []byte("bucket"), Pairs: []kv.Pair{ { Key: []byte("a"), Value: []byte("1"), }, { Key: []byte("ab"), Value: []byte("2"), }, { Key: []byte("abc"), Value: []byte("3"), }, { Key: []byte("abcd"), Value: []byte("4"), }, { Key: []byte("abcde"), Value: []byte("5"), }, { Key: []byte("bcd"), Value: []byte("6"), }, { Key: []byte("cd"), Value: []byte("7"), }, }, }, args: args{ bucket: []byte("bucket"), seek: []byte("abc"), }, wants: wants{ first: kv.Pair{ Key: []byte("a"), Value: []byte("1"), }, last: kv.Pair{ Key: []byte("cd"), Value: []byte("7"), }, seek: kv.Pair{ Key: []byte("abc"), Value: []byte("3"), }, next: kv.Pair{ Key: []byte("abcd"), Value: []byte("4"), }, prev: kv.Pair{ Key: []byte("abc"), Value: []byte("3"), }, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s, close := init(tt.fields, t) defer close() err := s.View(context.Background(), func(tx kv.Tx) error { b, err := tx.Bucket(tt.args.bucket) if err != nil { t.Errorf("unexpected error retrieving bucket: %v", err) return err } cur, err := b.Cursor() if (err != nil) != (tt.wants.err != nil) { t.Errorf("expected error '%v' got '%v'", tt.wants.err, err) return err } if err != nil && tt.wants.err != nil { if err.Error() != tt.wants.err.Error() { t.Errorf("expected error messages to match '%v' got '%v'", tt.wants.err, err.Error()) return err } } { key, val := cur.First() if want, got := tt.wants.first.Key, key; !bytes.Equal(want, got) { t.Errorf("exptected to get key %s got %s", string(want), string(got)) return err } if want, got := tt.wants.first.Value, val; !bytes.Equal(want, got) { t.Errorf("exptected to get value %s got %s", string(want), string(got)) return err } } { key, val := cur.Last() if want, got := tt.wants.last.Key, key; !bytes.Equal(want, got) { t.Errorf("exptected to get key %s got %s", string(want), string(got)) return err } if want, got := tt.wants.last.Value, val; !bytes.Equal(want, got) { t.Errorf("exptected to get value %s got %s", string(want), string(got)) return err } } { key, val := cur.Seek(tt.args.seek) if want, got := tt.wants.seek.Key, key; !bytes.Equal(want, got) { t.Errorf("exptected to get key %s got %s", string(want), string(got)) return err } if want, got := tt.wants.seek.Value, val; !bytes.Equal(want, got) { t.Errorf("exptected to get value %s got %s", string(want), string(got)) return err } } { key, val := cur.Next() if want, got := tt.wants.next.Key, key; !bytes.Equal(want, got) { t.Errorf("exptected to get key %s got %s", string(want), string(got)) return err } if want, got := tt.wants.next.Value, val; !bytes.Equal(want, got) { t.Errorf("exptected to get value %s got %s", string(want), string(got)) return err } } { key, val := cur.Prev() if want, got := tt.wants.prev.Key, key; !bytes.Equal(want, got) { t.Errorf("exptected to get key %s got %s", string(want), string(got)) return err } if want, got := tt.wants.prev.Value, val; !bytes.Equal(want, got) { t.Errorf("exptected to get value %s got %s", string(want), string(got)) return err } } return nil }) if err != nil { t.Fatalf("error during view transaction: %v", err) } }) } } // KVCursor tests the cursor contract for the key value store. func KVCursorWithHints( init func(KVStoreFields, *testing.T) (kv.Store, func()), t *testing.T, ) { type args struct { seek string until string hints []kv.CursorHint } pairs := func(keys ...string) []kv.Pair { p := make([]kv.Pair, len(keys)) for i, k := range keys { p[i].Key = []byte(k) p[i].Value = []byte("val:" + k) } return p } tests := []struct { name string fields KVStoreFields args args exp []string }{ { name: "no hints", fields: KVStoreFields{ Bucket: []byte("bucket"), Pairs: pairs( "aa/00", "aa/01", "aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00", "bbb/01", "bbb/02"), }, args: args{ seek: "aaa", until: "bbb/00", }, exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00"}, }, { name: "prefix hint", fields: KVStoreFields{ Bucket: []byte("bucket"), Pairs: pairs( "aa/00", "aa/01", "aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00", "bbb/01", "bbb/02"), }, args: args{ seek: "aaa", until: "aaa/03", hints: []kv.CursorHint{kv.WithCursorHintPrefix("aaa/")}, }, exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"}, }, { name: "start hint", fields: KVStoreFields{ Bucket: []byte("bucket"), Pairs: pairs( "aa/00", "aa/01", "aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00", "bbb/01", "bbb/02"), }, args: args{ seek: "aaa", until: "bbb/00", hints: []kv.CursorHint{kv.WithCursorHintKeyStart("aaa/")}, }, exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00"}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s, fin := init(tt.fields, t) defer fin() err := s.View(context.Background(), func(tx kv.Tx) error { b, err := tx.Bucket([]byte("bucket")) if err != nil { t.Errorf("unexpected error retrieving bucket: %v", err) return err } cur, err := b.Cursor(tt.args.hints...) if err != nil { t.Errorf("unexpected error: %v", err) return err } var got []string k, _ := cur.Seek([]byte(tt.args.seek)) for len(k) > 0 { got = append(got, string(k)) if string(k) == tt.args.until { break } k, _ = cur.Next() } if exp := tt.exp; !cmp.Equal(got, exp) { t.Errorf("unexpected cursor values: -got/+exp\n%v", cmp.Diff(got, exp)) } return nil }) if err != nil { t.Fatalf("error during view transaction: %v", err) } }) } } // KVView tests the view method contract for the key value store. func KVView( init func(KVStoreFields, *testing.T) (kv.Store, func()), t *testing.T, ) { type args struct { bucket []byte key []byte // If len(value) == 0 the test will not attempt a put value []byte // If true, the test will attempt to delete the provided key delete bool } type wants struct { value []byte } tests := []struct { name string fields KVStoreFields args args wants wants }{ { name: "basic view", fields: KVStoreFields{ Bucket: []byte("bucket"), Pairs: []kv.Pair{ { Key: []byte("hello"), Value: []byte("cruel world"), }, }, }, args: args{ bucket: []byte("bucket"), key: []byte("hello"), }, wants: wants{ value: []byte("cruel world"), }, }, { name: "basic view with delete", fields: KVStoreFields{ Bucket: []byte("bucket"), Pairs: []kv.Pair{ { Key: []byte("hello"), Value: []byte("cruel world"), }, }, }, args: args{ bucket: []byte("bucket"), key: []byte("hello"), delete: true, }, wants: wants{ value: []byte("cruel world"), }, }, { name: "basic view with put", fields: KVStoreFields{ Bucket: []byte("bucket"), Pairs: []kv.Pair{ { Key: []byte("hello"), Value: []byte("cruel world"), }, }, }, args: args{ bucket: []byte("bucket"), key: []byte("hello"), value: []byte("world"), delete: true, }, wants: wants{ value: []byte("cruel world"), }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s, close := init(tt.fields, t) defer close() err := s.View(context.Background(), func(tx kv.Tx) error { b, err := tx.Bucket(tt.args.bucket) if err != nil { t.Errorf("unexpected error retrieving bucket: %v", err) return err } if len(tt.args.value) != 0 { err := b.Put(tt.args.key, tt.args.value) if err == nil { return fmt.Errorf("expected transaction to fail") } if err != kv.ErrTxNotWritable { return err } return nil } value, err := b.Get(tt.args.key) if err != nil { return err } if want, got := tt.wants.value, value; !bytes.Equal(want, got) { t.Errorf("exptected to get value %s got %s", string(want), string(got)) return err } if tt.args.delete { err := b.Delete(tt.args.key) if err == nil { return fmt.Errorf("expected transaction to fail") } if err != kv.ErrTxNotWritable { return err } return nil } return nil }) if err != nil { t.Fatalf("error during view transaction: %v", err) } }) } } // KVUpdate tests the update method contract for the key value store. func KVUpdate( init func(KVStoreFields, *testing.T) (kv.Store, func()), t *testing.T, ) { type args struct { bucket []byte key []byte value []byte delete bool } type wants struct { value []byte } tests := []struct { name string fields KVStoreFields args args wants wants }{ { name: "basic update", fields: KVStoreFields{ Bucket: []byte("bucket"), Pairs: []kv.Pair{ { Key: []byte("hello"), Value: []byte("cruel world"), }, }, }, args: args{ bucket: []byte("bucket"), key: []byte("hello"), value: []byte("world"), }, wants: wants{ value: []byte("world"), }, }, { name: "basic update with delete", fields: KVStoreFields{ Bucket: []byte("bucket"), Pairs: []kv.Pair{ { Key: []byte("hello"), Value: []byte("cruel world"), }, }, }, args: args{ bucket: []byte("bucket"), key: []byte("hello"), value: []byte("world"), delete: true, }, wants: wants{}, }, // TODO: add case with failed update transaction that doesn't apply all of the changes. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s, close := init(tt.fields, t) defer close() { err := s.Update(context.Background(), func(tx kv.Tx) error { b, err := tx.Bucket(tt.args.bucket) if err != nil { t.Errorf("unexpected error retrieving bucket: %v", err) return err } if len(tt.args.value) != 0 { err := b.Put(tt.args.key, tt.args.value) if err != nil { return err } } if tt.args.delete { err := b.Delete(tt.args.key) if err != nil { return err } } value, err := b.Get(tt.args.key) if tt.args.delete { if err != kv.ErrKeyNotFound { return fmt.Errorf("expected key not found") } return nil } else if err != nil { return err } if want, got := tt.wants.value, value; !bytes.Equal(want, got) { t.Errorf("exptected to get value %s got %s", string(want), string(got)) return err } return nil }) if err != nil { t.Fatalf("error during update transaction: %v", err) } } { err := s.View(context.Background(), func(tx kv.Tx) error { b, err := tx.Bucket(tt.args.bucket) if err != nil { t.Errorf("unexpected error retrieving bucket: %v", err) return err } value, err := b.Get(tt.args.key) if tt.args.delete { if err != kv.ErrKeyNotFound { return fmt.Errorf("expected key not found") } } else if err != nil { return err } if want, got := tt.wants.value, value; !bytes.Equal(want, got) { t.Errorf("exptected to get value %s got %s", string(want), string(got)) return err } return nil }) if err != nil { t.Fatalf("error during view transaction: %v", err) } } }) } } // KVConcurrentUpdate tests concurrent calls to update. func KVConcurrentUpdate( init func(KVStoreFields, *testing.T) (kv.Store, func()), t *testing.T, ) { type args struct { bucket []byte key []byte valueA []byte valueB []byte } type wants struct { value []byte } tests := []struct { name string fields KVStoreFields args args wants wants }{ { name: "basic concurrent update", fields: KVStoreFields{ Bucket: []byte("bucket"), Pairs: []kv.Pair{ { Key: []byte("hello"), Value: []byte("cruel world"), }, }, }, args: args{ bucket: []byte("bucket"), key: []byte("hello"), valueA: []byte("world"), valueB: []byte("darkness my new friend"), }, wants: wants{ value: []byte("darkness my new friend"), }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Skip("https://github.com/influxdata/platform/issues/2371") s, closeFn := init(tt.fields, t) defer closeFn() errCh := make(chan error) var fn = func(v []byte) { err := s.Update(context.Background(), func(tx kv.Tx) error { b, err := tx.Bucket(tt.args.bucket) if err != nil { return err } if err := b.Put(tt.args.key, v); err != nil { return err } return nil }) if err != nil { errCh <- fmt.Errorf("error during update transaction: %v", err) } else { errCh <- nil } } go fn(tt.args.valueA) // To ensure that a is scheduled before b time.Sleep(time.Millisecond) go fn(tt.args.valueB) count := 0 for err := range errCh { count++ if err != nil { t.Fatal(err) } if count == 2 { break } } close(errCh) { err := s.View(context.Background(), func(tx kv.Tx) error { b, err := tx.Bucket(tt.args.bucket) if err != nil { t.Errorf("unexpected error retrieving bucket: %v", err) return err } deadline := time.Now().Add(1 * time.Second) var returnErr error for { if time.Now().After(deadline) { break } value, err := b.Get(tt.args.key) if err != nil { return err } if want, got := tt.wants.value, value; !bytes.Equal(want, got) { returnErr = fmt.Errorf("exptected to get value %s got %s", string(want), string(got)) } else { returnErr = nil break } } if returnErr != nil { return returnErr } return nil }) if err != nil { t.Fatalf("error during view transaction: %v", err) } } }) } }