Add ByteSlice Method for Etcd-kv (#15738)

Signed-off-by: Letian Jiang <letian.jiang@zilliz.com>
pull/15748/head
Letian Jiang 2022-02-25 11:25:53 +08:00 committed by GitHub
parent fcf0887d92
commit b56ec7ea97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 605 additions and 1 deletions

View File

@ -79,6 +79,27 @@ func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
return keys, values, nil
}
// LoadBytesWithPrefix returns all the keys and values with the given key prefix.
func (kv *EtcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) {
start := time.Now()
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, err
}
keys := make([]string, 0, resp.Count)
values := make([][]byte, 0, resp.Count)
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
values = append(values, kv.Value)
}
CheckElapseAndWarn(start, "Slow etcd operation load with prefix")
return keys, values, nil
}
// LoadWithPrefix2 returns all the the keys,values and key versions with the given key prefix.
func (kv *EtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) {
start := time.Now()
@ -102,6 +123,29 @@ func (kv *EtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64, erro
return keys, values, versions, nil
}
// LoadBytesWithPrefix2 returns all the the keys,values and key versions with the given key prefix.
func (kv *EtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64, error) {
start := time.Now()
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, nil, err
}
keys := make([]string, 0, resp.Count)
values := make([][]byte, 0, resp.Count)
versions := make([]int64, 0, resp.Count)
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
values = append(values, kv.Value)
versions = append(versions, kv.Version)
}
CheckElapseAndWarn(start, "Slow etcd operation load with prefix2")
return keys, values, versions, nil
}
// Load returns value of the key.
func (kv *EtcdKV) Load(key string) (string, error) {
start := time.Now()
@ -119,6 +163,23 @@ func (kv *EtcdKV) Load(key string) (string, error) {
return string(resp.Kvs[0].Value), nil
}
// LoadBytes returns value of the key.
func (kv *EtcdKV) LoadBytes(key string) ([]byte, error) {
start := time.Now()
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Get(ctx, key)
if err != nil {
return []byte{}, err
}
if resp.Count <= 0 {
return []byte{}, fmt.Errorf("there is no value on key = %s", key)
}
CheckElapseAndWarn(start, "Slow etcd operation load")
return resp.Kvs[0].Value, nil
}
// MultiLoad gets the values of the keys in a transaction.
func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error) {
start := time.Now()
@ -154,6 +215,41 @@ func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error) {
return result, nil
}
// MultiLoadBytes gets the values of the keys in a transaction.
func (kv *EtcdKV) MultiLoadBytes(keys []string) ([][]byte, error) {
start := time.Now()
ops := make([]clientv3.Op, 0, len(keys))
for _, keyLoad := range keys {
ops = append(ops, clientv3.OpGet(path.Join(kv.rootPath, keyLoad)))
}
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
if err != nil {
return [][]byte{}, err
}
result := make([][]byte, 0, len(keys))
invalid := make([]string, 0, len(keys))
for index, rp := range resp.Responses {
if rp.GetResponseRange().Kvs == nil || len(rp.GetResponseRange().Kvs) == 0 {
invalid = append(invalid, keys[index])
result = append(result, []byte{})
}
for _, ev := range rp.GetResponseRange().Kvs {
result = append(result, ev.Value)
}
}
if len(invalid) != 0 {
log.Warn("MultiLoad: there are invalid keys", zap.Strings("keys", invalid))
err = fmt.Errorf("there are invalid keys: %s", invalid)
return result, err
}
CheckElapseAndWarn(start, "Slow etcd operation multi load")
return result, nil
}
// LoadWithRevision returns keys, values and revision with given key prefix.
func (kv *EtcdKV) LoadWithRevision(key string) ([]string, []string, int64, error) {
start := time.Now()
@ -175,6 +271,27 @@ func (kv *EtcdKV) LoadWithRevision(key string) ([]string, []string, int64, error
return keys, values, resp.Header.Revision, nil
}
// LoadBytesWithRevision returns keys, values and revision with given key prefix.
func (kv *EtcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, int64, error) {
start := time.Now()
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, 0, err
}
keys := make([]string, 0, resp.Count)
values := make([][]byte, 0, resp.Count)
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
values = append(values, kv.Value)
}
CheckElapseAndWarn(start, "Slow etcd operation load with revision")
return keys, values, resp.Header.Revision, nil
}
// Save saves the key-value pair.
func (kv *EtcdKV) Save(key, value string) error {
start := time.Now()
@ -186,6 +303,17 @@ func (kv *EtcdKV) Save(key, value string) error {
return err
}
// SaveBytes saves the key-value pair.
func (kv *EtcdKV) SaveBytes(key string, value []byte) error {
start := time.Now()
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
_, err := kv.client.Put(ctx, key, string(value))
CheckElapseAndWarn(start, "Slow etcd operation save")
return err
}
// SaveWithLease is a function to put value in etcd with etcd lease options.
func (kv *EtcdKV) SaveWithLease(key, value string, id clientv3.LeaseID) error {
start := time.Now()
@ -197,6 +325,17 @@ func (kv *EtcdKV) SaveWithLease(key, value string, id clientv3.LeaseID) error {
return err
}
// SaveBytesWithLease is a function to put value in etcd with etcd lease options.
func (kv *EtcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.LeaseID) error {
start := time.Now()
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
_, err := kv.client.Put(ctx, key, string(value), clientv3.WithLease(id))
CheckElapseAndWarn(start, "Slow etcd operation save with lease")
return err
}
// MultiSave saves the key-value pairs in a transaction.
func (kv *EtcdKV) MultiSave(kvs map[string]string) error {
start := time.Now()
@ -213,6 +352,22 @@ func (kv *EtcdKV) MultiSave(kvs map[string]string) error {
return err
}
// MultiSaveBytes saves the key-value pairs in a transaction.
func (kv *EtcdKV) MultiSaveBytes(kvs map[string][]byte) error {
start := time.Now()
ops := make([]clientv3.Op, 0, len(kvs))
for key, value := range kvs {
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value)))
}
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
CheckElapseAndWarn(start, "Slow etcd operation multi save")
return err
}
// RemoveWithPrefix removes the keys with given prefix.
func (kv *EtcdKV) RemoveWithPrefix(prefix string) error {
start := time.Now()
@ -273,6 +428,26 @@ func (kv *EtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string)
return err
}
// MultiSaveBytesAndRemove saves the key-value pairs and removes the keys in a transaction.
func (kv *EtcdKV) MultiSaveBytesAndRemove(saves map[string][]byte, removals []string) error {
start := time.Now()
ops := make([]clientv3.Op, 0, len(saves)+len(removals))
for key, value := range saves {
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value)))
}
for _, keyDelete := range removals {
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete)))
}
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
CheckElapseAndWarn(start, "Slow etcd operation multi save and remove")
return err
}
// Watch starts watching a key, returns a watch channel.
func (kv *EtcdKV) Watch(key string) clientv3.WatchChan {
start := time.Now()
@ -336,6 +511,26 @@ func (kv *EtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals
return err
}
// MultiSaveBytesAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals.
func (kv *EtcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte, removals []string) error {
start := time.Now()
ops := make([]clientv3.Op, 0, len(saves))
for key, value := range saves {
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value)))
}
for _, keyDelete := range removals {
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete), clientv3.WithPrefix()))
}
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
CheckElapseAndWarn(start, "Slow etcd operation multi save and move with prefix")
return err
}
// Grant creates a new lease implemented in etcd grant interface.
func (kv *EtcdKV) Grant(ttl int64) (id clientv3.LeaseID, err error) {
start := time.Now()
@ -378,6 +573,28 @@ func (kv *EtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv
return nil
}
// CompareValueAndSwapBytes compares the existing value with compare, and if they are
// equal, the target is stored in etcd.
func (kv *EtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opts ...clientv3.OpOption) error {
start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Txn(ctx).If(
clientv3.Compare(
clientv3.Value(path.Join(kv.rootPath, key)),
"=",
string(value))).
Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit()
if err != nil {
return err
}
if !resp.Succeeded {
return fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key)
}
CheckElapseAndWarn(start, "Slow etcd operation compare value and swap")
return nil
}
// CompareVersionAndSwap compares the existing key-value's version with version, and if
// they are equal, the target is stored in etcd.
func (kv *EtcdKV) CompareVersionAndSwap(key string, source int64, target string, opts ...clientv3.OpOption) error {
@ -401,6 +618,29 @@ func (kv *EtcdKV) CompareVersionAndSwap(key string, source int64, target string,
return nil
}
// CompareVersionAndSwapBytes compares the existing key-value's version with version, and if
// they are equal, the target is stored in etcd.
func (kv *EtcdKV) CompareVersionAndSwapBytes(key string, source int64, target []byte, opts ...clientv3.OpOption) error {
start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Txn(ctx).If(
clientv3.Compare(
clientv3.Version(path.Join(kv.rootPath, key)),
"=",
source)).
Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit()
if err != nil {
return err
}
if !resp.Succeeded {
return fmt.Errorf("function CompareAndSwap error for compare is false for key: %s,"+
" source version: %d, target version: %s", key, source, target)
}
CheckElapseAndWarn(start, "Slow etcd operation compare version and swap")
return nil
}
// CheckElapseAndWarn checks the elapsed time and warns if it is too long.
func CheckElapseAndWarn(start time.Time, message string) bool {
elapsed := time.Since(start)

View File

@ -150,6 +150,123 @@ func TestEtcdKV_Load(te *testing.T) {
}
})
te.Run("EtcdKV SaveAndLoadBytes", func(t *testing.T) {
rootPath := "/etcd/test/root/saveandloadbytes"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
err = etcdKV.RemoveWithPrefix("")
require.NoError(t, err)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
saveAndLoadTests := []struct {
key string
value string
}{
{"test1", "value1"},
{"test2", "value2"},
{"test1/a", "value_a"},
{"test1/b", "value_b"},
}
for i, test := range saveAndLoadTests {
if i < 4 {
err = etcdKV.SaveBytes(test.key, []byte(test.value))
assert.NoError(t, err)
}
val, err := etcdKV.LoadBytes(test.key)
assert.NoError(t, err)
assert.Equal(t, test.value, string(val))
}
invalidLoadTests := []struct {
invalidKey string
}{
{"t"},
{"a"},
{"test1a"},
}
for _, test := range invalidLoadTests {
val, err := etcdKV.LoadBytes(test.invalidKey)
assert.Error(t, err)
assert.Zero(t, string(val))
}
loadPrefixTests := []struct {
prefix string
expectedKeys []string
expectedValues []string
expectedError error
}{
{"test", []string{
etcdKV.GetPath("test1"),
etcdKV.GetPath("test2"),
etcdKV.GetPath("test1/a"),
etcdKV.GetPath("test1/b")}, []string{"value1", "value2", "value_a", "value_b"}, nil},
{"test1", []string{
etcdKV.GetPath("test1"),
etcdKV.GetPath("test1/a"),
etcdKV.GetPath("test1/b")}, []string{"value1", "value_a", "value_b"}, nil},
{"test2", []string{etcdKV.GetPath("test2")}, []string{"value2"}, nil},
{"", []string{
etcdKV.GetPath("test1"),
etcdKV.GetPath("test2"),
etcdKV.GetPath("test1/a"),
etcdKV.GetPath("test1/b")}, []string{"value1", "value2", "value_a", "value_b"}, nil},
{"test1/a", []string{etcdKV.GetPath("test1/a")}, []string{"value_a"}, nil},
{"a", []string{}, []string{}, nil},
{"root", []string{}, []string{}, nil},
{"/etcd/test/root", []string{}, []string{}, nil},
}
for _, test := range loadPrefixTests {
actualKeys, actualValues, err := etcdKV.LoadBytesWithPrefix(test.prefix)
actualStringValues := make([]string, len(actualValues))
for i := range actualValues {
actualStringValues[i] = string(actualValues[i])
}
assert.ElementsMatch(t, test.expectedKeys, actualKeys)
assert.ElementsMatch(t, test.expectedValues, actualStringValues)
assert.Equal(t, test.expectedError, err)
actualKeys, actualValues, versions, err := etcdKV.LoadBytesWithPrefix2(test.prefix)
actualStringValues = make([]string, len(actualValues))
for i := range actualValues {
actualStringValues[i] = string(actualValues[i])
}
assert.ElementsMatch(t, test.expectedKeys, actualKeys)
assert.ElementsMatch(t, test.expectedValues, actualStringValues)
assert.NotZero(t, versions)
assert.Equal(t, test.expectedError, err)
}
removeTests := []struct {
validKey string
invalidKey string
}{
{"test1", "abc"},
{"test1/a", "test1/lskfjal"},
{"test1/b", "test1/b"},
{"test2", "-"},
}
for _, test := range removeTests {
err = etcdKV.Remove(test.validKey)
assert.NoError(t, err)
_, err = etcdKV.Load(test.validKey)
assert.Error(t, err)
err = etcdKV.Remove(test.validKey)
assert.NoError(t, err)
err = etcdKV.Remove(test.invalidKey)
assert.NoError(t, err)
}
})
te.Run("EtcdKV LoadWithRevision", func(t *testing.T) {
rootPath := "/etcd/test/root/LoadWithRevision"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
@ -194,6 +311,54 @@ func TestEtcdKV_Load(te *testing.T) {
})
te.Run("EtcdKV LoadBytesWithRevision", func(t *testing.T) {
rootPath := "/etcd/test/root/LoadWithRevision"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
prepareKV := []struct {
inKey string
inValue string
}{
{"a", "a_version1"},
{"b", "b_version2"},
{"a", "a_version3"},
{"c", "c_version4"},
{"a/suba", "a_version5"},
}
for _, test := range prepareKV {
err = etcdKV.SaveBytes(test.inKey, []byte(test.inValue))
require.NoError(t, err)
}
loadWithRevisionTests := []struct {
inKey string
expectedKeyNo int
expectedValues []string
}{
{"a", 2, []string{"a_version3", "a_version5"}},
{"b", 1, []string{"b_version2"}},
{"c", 1, []string{"c_version4"}},
}
for _, test := range loadWithRevisionTests {
keys, values, revision, err := etcdKV.LoadBytesWithRevision(test.inKey)
assert.NoError(t, err)
assert.Equal(t, test.expectedKeyNo, len(keys))
stringValues := make([]string, len(values))
for i := range values {
stringValues[i] = string(values[i])
}
assert.ElementsMatch(t, test.expectedValues, stringValues)
assert.NotZero(t, revision)
}
})
te.Run("EtcdKV MultiSaveAndMultiLoad", func(t *testing.T) {
rootPath := "/etcd/test/root/multi_save_and_multi_load"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
@ -302,6 +467,128 @@ func TestEtcdKV_Load(te *testing.T) {
assert.Empty(t, vs)
})
te.Run("EtcdKV MultiSaveBytesAndMultiLoadBytes", func(t *testing.T) {
rootPath := "/etcd/test/root/multi_save_bytes_and_multi_load_bytes"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
multiSaveTests := map[string]string{
"key_1": "value_1",
"key_2": "value_2",
"key_3/a": "value_3a",
"multikey_1": "multivalue_1",
"multikey_2": "multivalue_2",
"_": "other",
}
multiSaveBytesTests := make(map[string][]byte)
for k, v := range multiSaveTests {
multiSaveBytesTests[k] = []byte(v)
}
err = etcdKV.MultiSaveBytes(multiSaveBytesTests)
assert.NoError(t, err)
for k, v := range multiSaveTests {
actualV, err := etcdKV.LoadBytes(k)
assert.NoError(t, err)
assert.Equal(t, v, string(actualV))
}
multiLoadTests := []struct {
inputKeys []string
expectedValues []string
}{
{[]string{"key_1"}, []string{"value_1"}},
{[]string{"key_1", "key_2", "key_3/a"}, []string{"value_1", "value_2", "value_3a"}},
{[]string{"multikey_1", "multikey_2"}, []string{"multivalue_1", "multivalue_2"}},
{[]string{"_"}, []string{"other"}},
}
for _, test := range multiLoadTests {
vs, err := etcdKV.MultiLoadBytes(test.inputKeys)
stringVs := make([]string, len(vs))
for i := range vs {
stringVs[i] = string(vs[i])
}
assert.NoError(t, err)
assert.Equal(t, test.expectedValues, stringVs)
}
invalidMultiLoad := []struct {
invalidKeys []string
expectedValues []string
}{
{[]string{"a", "key_1"}, []string{"", "value_1"}},
{[]string{".....", "key_1"}, []string{"", "value_1"}},
{[]string{"*********"}, []string{""}},
{[]string{"key_1", "1"}, []string{"value_1", ""}},
}
for _, test := range invalidMultiLoad {
vs, err := etcdKV.MultiLoadBytes(test.invalidKeys)
stringVs := make([]string, len(vs))
for i := range vs {
stringVs[i] = string(vs[i])
}
assert.Error(t, err)
assert.Equal(t, test.expectedValues, stringVs)
}
removeWithPrefixTests := []string{
"key_1",
"multi",
}
for _, k := range removeWithPrefixTests {
err = etcdKV.RemoveWithPrefix(k)
assert.NoError(t, err)
ks, vs, err := etcdKV.LoadBytesWithPrefix(k)
assert.Empty(t, ks)
assert.Empty(t, vs)
assert.NoError(t, err)
}
multiRemoveTests := []string{
"key_2",
"key_3/a",
"multikey_2",
"_",
}
err = etcdKV.MultiRemove(multiRemoveTests)
assert.NoError(t, err)
ks, vs, err := etcdKV.LoadBytesWithPrefix("")
assert.NoError(t, err)
assert.Empty(t, ks)
assert.Empty(t, vs)
multiSaveAndRemoveTests := []struct {
multiSaves map[string][]byte
multiRemoves []string
}{
{map[string][]byte{"key_1": []byte("value_1")}, []string{}},
{map[string][]byte{"key_2": []byte("value_2")}, []string{"key_1"}},
{map[string][]byte{"key_3/a": []byte("value_3a")}, []string{"key_2"}},
{map[string][]byte{"multikey_1": []byte("multivalue_1")}, []string{}},
{map[string][]byte{"multikey_2": []byte("multivalue_2")}, []string{"multikey_1", "key_3/a"}},
{make(map[string][]byte), []string{"multikey_2"}},
}
for _, test := range multiSaveAndRemoveTests {
err = etcdKV.MultiSaveBytesAndRemove(test.multiSaves, test.multiRemoves)
assert.NoError(t, err)
}
ks, vs, err = etcdKV.LoadBytesWithPrefix("")
assert.NoError(t, err)
assert.Empty(t, ks)
assert.Empty(t, vs)
})
te.Run("EtcdKV MultiRemoveWithPrefix", func(t *testing.T) {
rootPath := "/etcd/test/root/multi_remove_with_prefix"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
@ -405,7 +692,7 @@ func TestEtcdKV_Load(te *testing.T) {
})
te.Run("Etcd Revision", func(t *testing.T) {
rootPath := "/etcd/test/root/watch"
rootPath := "/etcd/test/root/revision"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
@ -453,6 +740,55 @@ func TestEtcdKV_Load(te *testing.T) {
assert.Error(t, err)
})
te.Run("Etcd Revision Bytes", func(t *testing.T) {
rootPath := "/etcd/test/root/revision_bytes"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
revisionTests := []struct {
inKey string
fistValue []byte
secondValue []byte
}{
{"a", []byte("v1"), []byte("v11")},
{"y", []byte("v2"), []byte("v22")},
{"z", []byte("v3"), []byte("v33")},
}
for _, test := range revisionTests {
err = etcdKV.SaveBytes(test.inKey, test.fistValue)
require.NoError(t, err)
_, _, revision, _ := etcdKV.LoadBytesWithRevision(test.inKey)
ch := etcdKV.WatchWithRevision(test.inKey, revision+1)
err = etcdKV.SaveBytes(test.inKey, test.secondValue)
require.NoError(t, err)
resp := <-ch
assert.Equal(t, 1, len(resp.Events))
assert.Equal(t, string(test.secondValue), string(resp.Events[0].Kv.Value))
assert.Equal(t, revision+1, resp.Header.Revision)
}
err = etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1"))
assert.NoError(t, err)
value, err := etcdKV.LoadBytes("a/b/c")
assert.NoError(t, err)
assert.Equal(t, string(value), "1")
err = etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1"))
assert.Error(t, err)
err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2"))
assert.NoError(t, err)
err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2"))
assert.Error(t, err)
})
te.Run("Etcd Lease", func(t *testing.T) {
rootPath := "/etcd/test/root/lease"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
@ -480,6 +816,34 @@ func TestEtcdKV_Load(te *testing.T) {
}
})
te.Run("Etcd Lease Bytes", func(t *testing.T) {
rootPath := "/etcd/test/root/lease_bytes"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
leaseID, err := etcdKV.Grant(10)
assert.NoError(t, err)
etcdKV.KeepAlive(leaseID)
tests := map[string][]byte{
"a/b": []byte("v1"),
"a/b/c": []byte("v2"),
"x": []byte("v3"),
}
for k, v := range tests {
err = etcdKV.SaveBytesWithLease(k, v, leaseID)
assert.NoError(t, err)
err = etcdKV.SaveBytesWithLease(k, v, clientv3.LeaseID(999))
assert.Error(t, err)
}
})
}
func TestElapse(t *testing.T) {