Add watchWithVersion interface for etcdKV (#6663)

* Add watchWithVersion interface for etcdKV

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Add unittest

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>

* Add LoadWithVersion interface

Signed-off-by: xiaocai2333 <cai.zhang@zilliz.com>
pull/6597/head^2
cai.zhang 2021-07-21 18:20:11 +08:00 committed by GitHub
parent 67c1fdd2c0
commit a3c662c7bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 71 additions and 0 deletions

View File

@ -139,6 +139,25 @@ func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error) {
return result, nil
}
func (kv *EtcdKV) LoadWithVersion(key string) ([]string, []string, int64, error) {
key = path.Join(kv.rootPath, key)
log.Debug("LoadWithPrefix ", zap.String("prefix", key))
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, 0, err
}
keys := make([]string, 0, resp.Count)
values := make([]string, 0, resp.Count)
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
values = append(values, string(kv.Value))
}
return keys, values, resp.Header.Revision, nil
}
func (kv *EtcdKV) Save(key, value string) error {
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
@ -230,6 +249,12 @@ func (kv *EtcdKV) WatchWithPrefix(key string) clientv3.WatchChan {
return rch
}
func (kv *EtcdKV) WatchWithVersion(key string, revision int64) clientv3.WatchChan {
key = path.Join(kv.rootPath, key)
rch := kv.client.Watch(context.Background(), key, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision))
return rch
}
func (kv *EtcdKV) MultiRemoveWithPrefix(keys []string) error {
ops := make([]clientv3.Op, 0, len(keys))
for _, k := range keys {

View File

@ -308,6 +308,52 @@ func TestEtcdKV_WatchPrefix(t *testing.T) {
assert.True(t, resp.Created)
}
func TestEtcdKV_LoadWithVersion(t *testing.T) {
cli, err := newEtcdClient()
assert.Nil(t, err)
rootPath := "/etcd/test/root"
etcdKV := etcdkv.NewEtcdKV(cli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
key := "test-load-version"
err = etcdKV.Save(key, "v1")
assert.Nil(t, err)
_, values, _, err := etcdKV.LoadWithVersion(key)
assert.Nil(t, err)
assert.Equal(t, "v1", values[0])
}
func TestEtcdKV_WatchWithVersion(t *testing.T) {
cli, err := newEtcdClient()
assert.Nil(t, err)
rootPath := "/etcd/test/root"
etcdKV := etcdkv.NewEtcdKV(cli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
key := "test-version"
err = etcdKV.Save(key, "v")
assert.Nil(t, err)
_, _, revision, _ := etcdKV.LoadWithVersion(key)
ch := etcdKV.WatchWithVersion(key, revision+1)
err = etcdKV.Save(key, "v2")
assert.Nil(t, err)
resp2 := <-ch
assert.Equal(t, 1, len(resp2.Events))
assert.Equal(t, "v2", string(resp2.Events[0].Kv.Value))
assert.Equal(t, revision+1, resp2.Header.Revision)
}
func TestEtcdKV_CompareAndSwap(t *testing.T) {
cli, err := newEtcdClient()
assert.Nil(t, err)