Add A WalkWithPrefix API for MetaKv interface (#21585)

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/21601/head
jaime 2023-01-09 20:41:38 +08:00 committed by GitHub
parent 080bed9083
commit 7154b5377e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1485 additions and 160 deletions

View File

@ -1,109 +0,0 @@
# metaSnapShot
`metaSnapShot` enables `RootCoord` to query historical meta based on timestamp, it provides `Key-Vaule` interface. Take an example to illustrate what `metaSnapShot` is. The following figure shows a series of operations happened on the timeline.
![snap_shot](./graphs/snapshot_1.png)
| Timestamp | Operation |
|-----------|-----------|
| 100 | Set A=1 |
| 200 | Set B=2 |
| 300 | Set C=3 |
| 400 | Set A=10 |
| 500 | Delete B |
| 600 | Delete C |
Now assuming the Wall-Clock is `Timestamp=700`, so `B` should have been deleted from the system. But I want to know the value of `B` at `Timesamp=450`, how to do it? `metaSnapShot` is invented to solve this problem.
We need to briefly introduce `etcd`'s `MVCC` before `metaSnapShot`. Here is the test program:
```go
package etcdkv
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/clientv3"
)
func TestMVCC(t *testing.T) {
addr := []string{"127.0.0.1:2379"}
cli, err := clientv3.New(clientv3.Config{Endpoints: addr})
assert.Nil(t, err)
assert.NotNil(t, cli)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
testKey := "test-key"
rsp0, err := cli.Delete(ctx, testKey)
assert.Nil(t, err)
t.Logf("revision:%d", rsp0.Header.Revision)
rsp1, err := cli.Put(ctx, testKey, "value1")
assert.Nil(t, err)
t.Logf("revision:%d,value1", rsp1.Header.Revision)
rsp2, err := cli.Put(ctx, testKey, "value2")
assert.Nil(t, err)
t.Logf("revision:%d,value2", rsp2.Header.Revision)
rsp3, err := cli.Get(ctx, testKey, clientv3.WithRev(rsp1.Header.Revision))
assert.Nil(t, err)
t.Logf("get at revision:%d, value=%s", rsp1.Header.Revision, string(rsp3.Kvs[0].Value))
}
```
The output of above test program should look like this:
```text
=== RUN TestMVCC
etcd_mvcc_test.go:23: revision:401
etcd_mvcc_test.go:27: revision:402,value1
etcd_mvcc_test.go:31: revision:403,value2
etcd_mvcc_test.go:35: get at revision:402, value=value1
--- PASS: TestMVCC (0.01s)
```
In `etcd`, each writes operation would add `1` to `Revision`. So if we specify the `Revision` value at query, we can get the historical value under that `Revision`.
`metaSnapShot` is based on this feature of `etcd`. We will write an extra `Timestamp` on each write operation. `etcd`'s `Txn` makes sure that the `Timestamp` would have the same `Revision` with user data.
When querying, `metaSnapShot` will find an appropriate `Revision` based on the input `Timestamp`, and then query on `etcd` with this `Revision`.
In order to speed up getting `Revision` by `Timestamp`, `metaSnapShot` would maintain an array mapping the `Timestamp` to `Revision`. The default length of this array is `1024`, which is a type of circular array.
![snap_shot](./graphs/snapshot_2.png)
- `maxPos` points to the position where `Timestamp` and `Revision` are maximum.
- `minPos` points to the position where `Timestamp` and `Revision` are minimum.
- For each update operation, we first add `1` to `maxPos`. So the new `maxPos` would cover the old `minPos` position, and then add `1` to the old `minPos`
- From `0` to `maxPos` and from `minPos` to `1023`, which are two incremental arrays. We can use binary search to quickly get the `Revision` by the input `Timestamp`
- If the input `Timestamp` is greater than the `Timestamp` where the `maxPos` is located, then the `Revision` at the position of the `maxPos` will be returned
- If the input `Timestamp` is less than the `Timestamp` where `minPos` is located, `metaSnapshot` will load the historical `Timestamp` and `Revision` from `etcd` to find an appropriate `Revision` value.
The interface of `metaSnapShot` is defined as follows:
```go
type SnapShotKV interface {
Load(key string, ts typeutil.Timestamp) (string, error)
LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error)
Save(key, value string) (typeutil.Timestamp, error)
MultiSave(kvs map[string]string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
}
```
For the `Read` operations (`Load` and `LoadWithPrefix`), the input parameter `typeutil.Timestamp` is used to tell `metaSnapShot` to load the value based on that `Timestamp`.
For the `Write` operations (`Save`, `MultiSave`, `MultiSaveAndRemoveWithPrefix`), return values including `typeutil.Timestamp`, which is used to tell the caller when these write operations happened.
You might be curious about the parameter `additions` of `MultiSave` and `MultiSaveAndRemoveWithPrefix`: What does `additions` do, and why?
`additions` is an array of `func(ts typeutil.Timestamp) (string, string, error)`. So it's a function, receiving `typeutil.Timestamp` as an input, and returns two `string` which is `key-value` pair. If `error` is `nil` in the return value, `metaSnapShot` would write this `key-value` pair into `etcd`.
Referring to the document of `CreateCollection`, a timestamp is created for `Collection`, which is the timestamp when the `Collection`'s meta have been written into `etcd`, not the timestamp when `RootCoord` receives the request. So before writing the `Collection`'s meta into `etcd`, `metaSnapshot` would allocate a timestamp, and call all the `additions`. This would make sure the timestamp created for the `Collection` is correct.
![create_collection](./graphs/dml_create_collection.png)

View File

@ -11,8 +11,6 @@
#include "ValidationUtil.h"
#include "config/ServerConfig.h"
//#include "db/Constants.h"
//#include "db/Utils.h"
#include "knowhere/index/vector_index/ConfAdapter.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "utils/Log.h"

View File

@ -513,6 +513,7 @@ type mockETCDKV struct {
loadWithPrefix func(key string) ([]string, []string, error)
loadWithRevision func(key string) ([]string, []string, int64, error)
removeWithPrefix func(key string) error
walkWithPrefix func(prefix string, paginationSize int, fn func([]byte, []byte) error) error
}
func NewMockEtcdKV() *mockETCDKV {
@ -544,6 +545,9 @@ func NewMockEtcdKV() *mockETCDKV {
removeWithPrefix: func(key string) error {
return nil
},
walkWithPrefix: func(prefix string, paginationSize int, fn func([]byte, []byte) error) error {
return nil
},
}
}
@ -582,6 +586,10 @@ func NewMockEtcdKVWithReal(real kv.MetaKv) *mockETCDKV {
}
}
func (mk *mockETCDKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error {
return mk.walkWithPrefix(prefix, paginationSize, fn)
}
func (mk *mockETCDKV) Save(key string, value string) error {
return mk.save(key, value)
}

View File

@ -24,13 +24,12 @@ import (
"sync"
"time"
"github.com/milvus-io/milvus/internal/common"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
)
@ -86,6 +85,40 @@ func (kv *EmbedEtcdKV) GetPath(key string) string {
return path.Join(kv.rootPath, key)
}
func (kv *EmbedEtcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error {
prefix = path.Join(kv.rootPath, prefix)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
batch := int64(paginationSize)
opts := []clientv3.OpOption{
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
clientv3.WithLimit(batch),
clientv3.WithRange(clientv3.GetPrefixRangeEnd(prefix)),
}
key := prefix
for {
resp, err := kv.client.Get(ctx, key, opts...)
if err != nil {
return err
}
for _, kv := range resp.Kvs {
if err = fn(kv.Key, kv.Value); err != nil {
return err
}
}
if !resp.More {
break
}
// move to next key
key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
}
return nil
}
// LoadWithPrefix returns all the keys and values with the given key prefix
func (kv *EmbedEtcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
key = path.Join(kv.rootPath, key)

View File

@ -17,16 +17,20 @@
package etcdkv_test
import (
"errors"
"fmt"
"sort"
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
embed_etcd_kv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
"golang.org/x/exp/maps"
embed_etcd_kv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
func TestEmbedEtcd(te *testing.T) {
@ -954,4 +958,79 @@ func TestEmbedEtcd(te *testing.T) {
assert.Error(t, err)
}
})
te.Run("Etcd WalkWithPagination", func(t *testing.T) {
rootPath := "/etcd/test/root/walkWithPagination"
metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
assert.Nil(t, err)
defer metaKv.Close()
defer metaKv.RemoveWithPrefix("")
kvs := map[string]string{
"A/100": "v1",
"AA/100": "v2",
"AB/100": "v3",
"AB/2/100": "v4",
"B/100": "v5",
}
err = metaKv.MultiSave(kvs)
assert.NoError(t, err)
for k, v := range kvs {
actualV, err := metaKv.Load(k)
assert.NoError(t, err)
assert.Equal(t, v, actualV)
}
t.Run("apply function error ", func(t *testing.T) {
err = metaKv.WalkWithPrefix("A", 5, func(key []byte, value []byte) error {
return errors.New("error")
})
assert.Error(t, err)
})
t.Run("get with non-exist prefix ", func(t *testing.T) {
err = metaKv.WalkWithPrefix("non-exist-prefix", 5, func(key []byte, value []byte) error {
return nil
})
assert.NoError(t, err)
})
t.Run("with different pagination", func(t *testing.T) {
testFn := func(pagination int) {
expected := map[string]string{
"A/100": "v1",
"AA/100": "v2",
"AB/100": "v3",
"AB/2/100": "v4",
}
expectedSortedKey := maps.Keys(expected)
sort.Strings(expectedSortedKey)
ret := make(map[string]string)
actualSortedKey := make([]string, 0)
err = metaKv.WalkWithPrefix("A", pagination, func(key []byte, value []byte) error {
k := string(key)
k = k[len(rootPath)+1:]
ret[k] = string(value)
actualSortedKey = append(actualSortedKey, k)
return nil
})
assert.NoError(t, err)
assert.Equal(t, expected, ret, fmt.Errorf("pagination: %d", pagination))
assert.Equal(t, expectedSortedKey, actualSortedKey, fmt.Errorf("pagination: %d", pagination))
}
testFn(-100)
testFn(-1)
testFn(0)
testFn(1)
testFn(5)
testFn(100)
})
})
}

View File

@ -64,6 +64,43 @@ func (kv *EtcdKV) GetPath(key string) string {
return path.Join(kv.rootPath, key)
}
func (kv *EtcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error {
start := time.Now()
prefix = path.Join(kv.rootPath, prefix)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
batch := int64(paginationSize)
opts := []clientv3.OpOption{
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
clientv3.WithLimit(batch),
clientv3.WithRange(clientv3.GetPrefixRangeEnd(prefix)),
}
key := prefix
for {
resp, err := kv.client.Get(ctx, key, opts...)
if err != nil {
return err
}
for _, kv := range resp.Kvs {
if err = fn(kv.Key, kv.Value); err != nil {
return err
}
}
if !resp.More {
break
}
// move to next key
key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
}
CheckElapseAndWarn(start, "Slow etcd operation(WalkWithPagination)", zap.String("prefix", prefix))
return nil
}
// LoadWithPrefix returns all the keys and values with the given key prefix.
func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
start := time.Now()

View File

@ -17,18 +17,22 @@
package etcdkv_test
import (
"errors"
"fmt"
"os"
"sort"
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/funcutil"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
"golang.org/x/exp/maps"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
var Params paramtable.ComponentParam
@ -909,6 +913,91 @@ func TestEtcdKV_Load(te *testing.T) {
})
}
func Test_WalkWithPagination(t *testing.T) {
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
defer etcdCli.Close()
assert.NoError(t, err)
rootPath := "/etcd/test/root/pagination"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
kvs := map[string]string{
"A/100": "v1",
"AA/100": "v2",
"AB/100": "v3",
"AB/2/100": "v4",
"B/100": "v5",
}
err = etcdKV.MultiSave(kvs)
assert.NoError(t, err)
for k, v := range kvs {
actualV, err := etcdKV.Load(k)
assert.NoError(t, err)
assert.Equal(t, v, actualV)
}
t.Run("apply function error ", func(t *testing.T) {
err = etcdKV.WalkWithPrefix("A", 5, func(key []byte, value []byte) error {
return errors.New("error")
})
assert.Error(t, err)
})
t.Run("get with non-exist prefix ", func(t *testing.T) {
err = etcdKV.WalkWithPrefix("non-exist-prefix", 5, func(key []byte, value []byte) error {
return nil
})
assert.NoError(t, err)
})
t.Run("with different pagination", func(t *testing.T) {
testFn := func(pagination int) {
expected := map[string]string{
"A/100": "v1",
"AA/100": "v2",
"AB/100": "v3",
"AB/2/100": "v4",
}
expectedSortedKey := maps.Keys(expected)
sort.Strings(expectedSortedKey)
ret := make(map[string]string)
actualSortedKey := make([]string, 0)
err = etcdKV.WalkWithPrefix("A", pagination, func(key []byte, value []byte) error {
k := string(key)
k = k[len(rootPath)+1:]
ret[k] = string(value)
actualSortedKey = append(actualSortedKey, k)
return nil
})
assert.NoError(t, err)
assert.Equal(t, expected, ret, fmt.Errorf("pagination: %d", pagination))
assert.Equal(t, expectedSortedKey, actualSortedKey, fmt.Errorf("pagination: %d", pagination))
}
testFn(-100)
testFn(-1)
testFn(0)
testFn(1)
testFn(5)
testFn(100)
})
}
func TestElapse(t *testing.T) {
start := time.Now()
isElapse := etcdkv.CheckElapseAndWarn(start, "err message")

View File

@ -17,8 +17,9 @@
package kv
import (
"github.com/milvus-io/milvus/internal/util/typeutil"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
// CompareFailedError is a helper type for checking MetaKv CompareAndSwap series func error type
@ -46,7 +47,6 @@ type BaseKV interface {
Remove(key string) error
MultiRemove(keys []string) error
RemoveWithPrefix(key string) error
Close()
}
@ -59,6 +59,7 @@ type TxnKV interface {
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error
}
//go:generate mockery --name=MetaKv --with-expecter
// MetaKv is TxnKV for metadata. It should save data with lease.
type MetaKv interface {
TxnKV
@ -76,6 +77,7 @@ type MetaKv interface {
KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error)
CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error)
CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error)
WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error
}
//go:generate mockery --name=SnapShotKV --with-expecter

View File

@ -20,9 +20,9 @@ import (
"strings"
"sync"
"github.com/milvus-io/milvus/internal/common"
"github.com/google/btree"
"github.com/milvus-io/milvus/internal/common"
)
// MemoryKV implements BaseKv interface and relies on underling btree.BTree.

1188
internal/kv/mocks/MetaKv.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
// Code generated by mockery v2.16.0. DO NOT EDIT.
package mocks
@ -44,8 +44,8 @@ type SnapShotKV_Load_Call struct {
}
// Load is a helper method to define mock.On call
// - key string
// - ts uint64
// - key string
// - ts uint64
func (_e *SnapShotKV_Expecter) Load(key interface{}, ts interface{}) *SnapShotKV_Load_Call {
return &SnapShotKV_Load_Call{Call: _e.mock.On("Load", key, ts)}
}
@ -100,8 +100,8 @@ type SnapShotKV_LoadWithPrefix_Call struct {
}
// LoadWithPrefix is a helper method to define mock.On call
// - key string
// - ts uint64
// - key string
// - ts uint64
func (_e *SnapShotKV_Expecter) LoadWithPrefix(key interface{}, ts interface{}) *SnapShotKV_LoadWithPrefix_Call {
return &SnapShotKV_LoadWithPrefix_Call{Call: _e.mock.On("LoadWithPrefix", key, ts)}
}
@ -138,8 +138,8 @@ type SnapShotKV_MultiSave_Call struct {
}
// MultiSave is a helper method to define mock.On call
// - kvs map[string]string
// - ts uint64
// - kvs map[string]string
// - ts uint64
func (_e *SnapShotKV_Expecter) MultiSave(kvs interface{}, ts interface{}) *SnapShotKV_MultiSave_Call {
return &SnapShotKV_MultiSave_Call{Call: _e.mock.On("MultiSave", kvs, ts)}
}
@ -176,9 +176,9 @@ type SnapShotKV_MultiSaveAndRemoveWithPrefix_Call struct {
}
// MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
// - ts uint64
// - saves map[string]string
// - removals []string
// - ts uint64
func (_e *SnapShotKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}, ts interface{}) *SnapShotKV_MultiSaveAndRemoveWithPrefix_Call {
return &SnapShotKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals, ts)}
}
@ -215,9 +215,9 @@ type SnapShotKV_Save_Call struct {
}
// Save is a helper method to define mock.On call
// - key string
// - value string
// - ts uint64
// - key string
// - value string
// - ts uint64
func (_e *SnapShotKV_Expecter) Save(key interface{}, value interface{}, ts interface{}) *SnapShotKV_Save_Call {
return &SnapShotKV_Save_Call{Call: _e.mock.On("Save", key, value, ts)}
}

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
// Code generated by mockery v2.16.0. DO NOT EDIT.
package mocks
@ -71,7 +71,7 @@ type TxnKV_Load_Call struct {
}
// Load is a helper method to define mock.On call
// - key string
// - key string
func (_e *TxnKV_Expecter) Load(key interface{}) *TxnKV_Load_Call {
return &TxnKV_Load_Call{Call: _e.mock.On("Load", key)}
}
@ -126,7 +126,7 @@ type TxnKV_LoadWithPrefix_Call struct {
}
// LoadWithPrefix is a helper method to define mock.On call
// - key string
// - key string
func (_e *TxnKV_Expecter) LoadWithPrefix(key interface{}) *TxnKV_LoadWithPrefix_Call {
return &TxnKV_LoadWithPrefix_Call{Call: _e.mock.On("LoadWithPrefix", key)}
}
@ -172,7 +172,7 @@ type TxnKV_MultiLoad_Call struct {
}
// MultiLoad is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *TxnKV_Expecter) MultiLoad(keys interface{}) *TxnKV_MultiLoad_Call {
return &TxnKV_MultiLoad_Call{Call: _e.mock.On("MultiLoad", keys)}
}
@ -209,7 +209,7 @@ type TxnKV_MultiRemove_Call struct {
}
// MultiRemove is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *TxnKV_Expecter) MultiRemove(keys interface{}) *TxnKV_MultiRemove_Call {
return &TxnKV_MultiRemove_Call{Call: _e.mock.On("MultiRemove", keys)}
}
@ -246,7 +246,7 @@ type TxnKV_MultiRemoveWithPrefix_Call struct {
}
// MultiRemoveWithPrefix is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *TxnKV_Expecter) MultiRemoveWithPrefix(keys interface{}) *TxnKV_MultiRemoveWithPrefix_Call {
return &TxnKV_MultiRemoveWithPrefix_Call{Call: _e.mock.On("MultiRemoveWithPrefix", keys)}
}
@ -283,7 +283,7 @@ type TxnKV_MultiSave_Call struct {
}
// MultiSave is a helper method to define mock.On call
// - kvs map[string]string
// - kvs map[string]string
func (_e *TxnKV_Expecter) MultiSave(kvs interface{}) *TxnKV_MultiSave_Call {
return &TxnKV_MultiSave_Call{Call: _e.mock.On("MultiSave", kvs)}
}
@ -320,8 +320,8 @@ type TxnKV_MultiSaveAndRemove_Call struct {
}
// MultiSaveAndRemove is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
// - saves map[string]string
// - removals []string
func (_e *TxnKV_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}) *TxnKV_MultiSaveAndRemove_Call {
return &TxnKV_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", saves, removals)}
}
@ -358,8 +358,8 @@ type TxnKV_MultiSaveAndRemoveWithPrefix_Call struct {
}
// MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
// - saves map[string]string
// - removals []string
func (_e *TxnKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}) *TxnKV_MultiSaveAndRemoveWithPrefix_Call {
return &TxnKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals)}
}
@ -396,7 +396,7 @@ type TxnKV_Remove_Call struct {
}
// Remove is a helper method to define mock.On call
// - key string
// - key string
func (_e *TxnKV_Expecter) Remove(key interface{}) *TxnKV_Remove_Call {
return &TxnKV_Remove_Call{Call: _e.mock.On("Remove", key)}
}
@ -433,7 +433,7 @@ type TxnKV_RemoveWithPrefix_Call struct {
}
// RemoveWithPrefix is a helper method to define mock.On call
// - key string
// - key string
func (_e *TxnKV_Expecter) RemoveWithPrefix(key interface{}) *TxnKV_RemoveWithPrefix_Call {
return &TxnKV_RemoveWithPrefix_Call{Call: _e.mock.On("RemoveWithPrefix", key)}
}
@ -470,8 +470,8 @@ type TxnKV_Save_Call struct {
}
// Save is a helper method to define mock.On call
// - key string
// - value string
// - key string
// - value string
func (_e *TxnKV_Expecter) Save(key interface{}, value interface{}) *TxnKV_Save_Call {
return &TxnKV_Save_Call{Call: _e.mock.On("Save", key, value)}
}

View File

@ -23,16 +23,16 @@ import (
"path"
"sync"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
"go.etcd.io/etcd/api/v3/mvccpb"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
// proxyManager manages proxy connected to the rootcoord
@ -77,7 +77,7 @@ func (p *proxyManager) DelSessionFunc(fns ...func(*sessionutil.Session)) {
// WatchProxy starts a goroutine to watch proxy session changes on etcd
func (p *proxyManager) WatchProxy() error {
ctx, cancel := context.WithTimeout(p.ctx, rootcoord.RequestTimeout)
ctx, cancel := context.WithTimeout(p.ctx, etcdkv.RequestTimeout)
defer cancel()
sessions, rev, err := p.getSessionsOnEtcd(ctx)
@ -210,7 +210,7 @@ func (p *proxyManager) Stop() {
// listProxyInEtcd helper function lists proxy in etcd
func listProxyInEtcd(ctx context.Context, cli *clientv3.Client) (map[int64]*sessionutil.Session, error) {
ctx2, cancel := context.WithTimeout(ctx, rootcoord.RequestTimeout)
ctx2, cancel := context.WithTimeout(ctx, etcdkv.RequestTimeout)
defer cancel()
resp, err := cli.Get(
ctx2,