Add interface for EtcdKV

Signed-off-by: XuanYang-cn <xuan.yang@zilliz.com>
pull/4973/head^2
XuanYang-cn 2020-11-03 15:08:50 +08:00 committed by yefu.chen
parent e962b57fbb
commit 3de52a51c2
5 changed files with 237 additions and 5 deletions

1
.gitignore vendored
View File

@ -52,3 +52,4 @@ lib/
*.pyc
.DS_Store
*.swp

View File

@ -510,12 +510,12 @@ func NewIdAllocator(ctx context.Context) *IdAllocator
```go
type KVBase interface {
Load(key string) (string, error)
MultiLoad(keys []string)
MultiLoad(keys []string) ([]string, error)
Save(key, value string) error
MultiSave(kvs map[string]string)
MultiSave(kvs map[string]string) error
Remove(key string) error
MultiRemove(keys []string)
MultiSaveAndRemove(saves map[string]string, removals []string)
MultiRemove(keys []string) error
MultiSaveAndRemove(saves map[string]string, removals []string) error
Watch(key string) clientv3.WatchChan
WatchWithPrefix(key string) clientv3.WatchChan
LoadWithPrefix(key string) ( []string, []string, error)

View File

@ -62,6 +62,38 @@ func (kv *EtcdKV) Load(key string) (string, error) {
return string(resp.Kvs[0].Value), nil
}
func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error) {
ops := make([]clientv3.Op, 0, len(keys))
for _, key_load := range keys {
ops = append(ops, clientv3.OpGet(path.Join(kv.rootPath, key_load)))
}
ctx, _ := context.WithTimeout(context.TODO(), requestTimeout)
resp, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
if err != nil {
return []string{}, err
}
result := make([]string, 0, len(keys))
invalid := make([]string, 0, len(keys))
for index, rp := range resp.Responses {
if rp.GetResponseRange().Kvs == nil {
invalid = append(invalid, keys[index])
result = append(result, "")
}
for _, ev := range rp.GetResponseRange().Kvs {
log.Printf("MultiLoad: %s -> %s\n", string(ev.Key), string(ev.Value))
result = append(result, string(ev.Value))
}
}
if len(invalid) != 0 {
log.Printf("MultiLoad: there are invalid keys: %s", invalid)
err = errors.Errorf("there are invalid keys: %s", invalid)
return result, err
}
return result, nil
}
func (kv *EtcdKV) Save(key, value string) error {
key = path.Join(kv.rootPath, key)
ctx, _ := context.WithTimeout(context.TODO(), requestTimeout)
@ -69,6 +101,17 @@ func (kv *EtcdKV) Save(key, value string) error {
return err
}
func (kv *EtcdKV) MultiSave(kvs map[string]string) error {
ops := make([]clientv3.Op, 0, len(kvs))
for key, value := range kvs {
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value))
}
ctx, _ := context.WithTimeout(context.TODO(), requestTimeout)
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
return err
}
func (kv *EtcdKV) Remove(key string) error {
key = path.Join(kv.rootPath, key)
ctx, _ := context.WithTimeout(context.TODO(), requestTimeout)
@ -76,6 +119,33 @@ func (kv *EtcdKV) Remove(key string) error {
return err
}
func (kv *EtcdKV) MultiRemove(keys []string) error {
ops := make([]clientv3.Op, 0, len(keys))
for _, key := range keys {
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, key)))
}
ctx, _ := context.WithTimeout(context.TODO(), requestTimeout)
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
return err
}
func (kv *EtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) error {
ops := make([]clientv3.Op, 0, len(saves))
for key, value := range saves {
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value))
}
for _, key_delete := range removals {
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, key_delete)))
}
log.Printf("MultiSaveAndRemove")
ctx, _ := context.WithTimeout(context.TODO(), requestTimeout)
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
return err
}
func (kv *EtcdKV) Watch(key string) clientv3.WatchChan {
key = path.Join(kv.rootPath, key)
rch := kv.client.Watch(context.Background(), key)

View File

@ -1,6 +1,7 @@
package kv
import (
"context"
"path"
"testing"
@ -11,8 +12,12 @@ import (
func TestEtcdKV_Load(t *testing.T) {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}})
assert.Nil(t, err)
kv := NewEtcdKV(cli, "/etcd/test/root")
rootpath := "/etcd/test/root"
kv := NewEtcdKV(cli, rootpath)
ctx, _ := context.WithTimeout(context.TODO(), requestTimeout)
defer kv.Close()
defer kv.client.Delete(ctx, rootpath, clientv3.WithPrefix())
err = kv.Save("abc", "123")
assert.Nil(t, err)
@ -32,4 +37,156 @@ func TestEtcdKV_Load(t *testing.T) {
assert.Equal(t, keys[1], path.Join(kv.rootPath, "abcd"))
assert.Equal(t, vals[0], "123")
assert.Equal(t, vals[1], "1234")
err = kv.Save("key_1", "123")
assert.Nil(t, err)
err = kv.Save("key_2", "456")
assert.Nil(t, err)
err = kv.Save("key_3", "789")
assert.Nil(t, err)
keys = []string{"key_1", "key_100"}
vals, err = kv.MultiLoad(keys)
assert.NotNil(t, err)
assert.Equal(t, len(vals), len(keys))
assert.Equal(t, vals[0], "123")
assert.Empty(t, vals[1])
keys = []string{"key_1", "key_2"}
vals, err = kv.MultiLoad(keys)
assert.Nil(t, err)
assert.Equal(t, len(vals), len(keys))
assert.Equal(t, vals[0], "123")
assert.Equal(t, vals[1], "456")
}
func TestEtcdKV_MultiSave(t *testing.T) {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}})
assert.Nil(t, err)
rootpath := "/etcd/test/root"
kv := NewEtcdKV(cli, rootpath)
ctx, _ := context.WithTimeout(context.TODO(), requestTimeout)
defer kv.Close()
defer kv.client.Delete(ctx, rootpath, clientv3.WithPrefix())
err = kv.Save("key_1", "111")
assert.Nil(t, err)
kvs := map[string]string{
"key_1": "123",
"key_2": "456",
}
err = kv.MultiSave(kvs)
assert.Nil(t, err)
val, err := kv.Load("key_1")
assert.Nil(t, err)
assert.Equal(t, val, "123")
}
func TestEtcdKV_Remove(t *testing.T) {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}})
assert.Nil(t, err)
rootpath := "/etcd/test/root"
kv := NewEtcdKV(cli, rootpath)
ctx, _ := context.WithTimeout(context.TODO(), requestTimeout)
defer kv.Close()
defer kv.client.Delete(ctx, rootpath, clientv3.WithPrefix())
err = kv.Save("key_1", "123")
assert.Nil(t, err)
err = kv.Save("key_2", "456")
assert.Nil(t, err)
val, err := kv.Load("key_1")
assert.Nil(t, err)
assert.Equal(t, val, "123")
// delete "key_1"
err = kv.Remove("key_1")
assert.Nil(t, err)
val, err = kv.Load("key_1")
assert.Error(t, err)
assert.Empty(t, val)
val, err = kv.Load("key_2")
assert.Nil(t, err)
assert.Equal(t, val, "456")
keys, vals, err := kv.LoadWithPrefix("key")
assert.Nil(t, err)
assert.Equal(t, len(keys), len(vals))
assert.Equal(t, len(keys), 1)
assert.Equal(t, keys[0], path.Join(kv.rootPath, "key_2"))
assert.Equal(t, vals[0], "456")
// MultiRemove
err = kv.Save("key_1", "111")
assert.Nil(t, err)
kvs := map[string]string{
"key_1": "123",
"key_2": "456",
"key_3": "789",
"key_4": "012",
}
err = kv.MultiSave(kvs)
assert.Nil(t, err)
val, err = kv.Load("key_1")
assert.Nil(t, err)
assert.Equal(t, val, "123")
val, err = kv.Load("key_3")
assert.Nil(t, err)
assert.Equal(t, val, "789")
keys = []string{"key_1", "key_2", "key_3"}
err = kv.MultiRemove(keys)
assert.Nil(t, err)
val, err = kv.Load("key_1")
assert.Error(t, err)
assert.Empty(t, val)
}
func TestEtcdKV_MultiSaveAndRemove(t *testing.T) {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}})
assert.Nil(t, err)
rootpath := "/etcd/test/root"
kv := NewEtcdKV(cli, rootpath)
ctx, _ := context.WithTimeout(context.TODO(), requestTimeout)
defer kv.Close()
defer kv.client.Delete(ctx, rootpath, clientv3.WithPrefix())
err = kv.Save("key_1", "123")
assert.Nil(t, err)
err = kv.Save("key_2", "456")
assert.Nil(t, err)
err = kv.Save("key_3", "789")
assert.Nil(t, err)
kvs := map[string]string{
"key_1": "111",
"key_2": "444",
}
keys := []string{"key_3"}
err = kv.MultiSaveAndRemove(kvs, keys)
assert.Nil(t, err)
val, err := kv.Load("key_1")
assert.Nil(t, err)
assert.Equal(t, val, "111")
val, err = kv.Load("key_2")
assert.Nil(t, err)
assert.Equal(t, val, "444")
val, err = kv.Load("key_3")
assert.Error(t, err)
assert.Empty(t, val)
}

View File

@ -4,9 +4,13 @@ import "go.etcd.io/etcd/clientv3"
type Base interface {
Load(key string) (string, error)
MultiLoad(keys []string) ([]string, error)
Save(key, value string) error
MultiSave(kvs map[string]string) error
Remove(key string) error
MultiRemove(keys []string) error
Watch(key string) clientv3.WatchChan
MultiSaveAndRemove(saves map[string]string, removals []string) error
WatchWithPrefix(key string) clientv3.WatchChan
LoadWithPrefix(key string) ([]string, []string, error)
Close()