Add based on timetravel GC for snapshot KV (#21417)

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/21524/head
jaime 2023-01-04 21:37:35 +08:00 committed by GitHub
parent 3e78fc993b
commit 58b79eb74c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1874 additions and 1135 deletions

View File

@ -1,109 +0,0 @@
# metaSnapShot
`metaSnapShot` enables `RootCoord` to query historical meta based on timestamp, it provides `Key-Vaule` interface. Take an example to illustrate what `metaSnapShot` is. The following figure shows a series of operations happened on the timeline.
![snap_shot](./graphs/snapshot_1.png)
| Timestamp | Operation |
|-----------|-----------|
| 100 | Set A=1 |
| 200 | Set B=2 |
| 300 | Set C=3 |
| 400 | Set A=10 |
| 500 | Delete B |
| 600 | Delete C |
Now assuming the Wall-Clock is `Timestamp=700`, so `B` should have been deleted from the system. But I want to know the value of `B` at `Timesamp=450`, how to do it? `metaSnapShot` is invented to solve this problem.
We need to briefly introduce `etcd`'s `MVCC` before `metaSnapShot`. Here is the test program:
```go
package etcdkv
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/clientv3"
)
func TestMVCC(t *testing.T) {
addr := []string{"127.0.0.1:2379"}
cli, err := clientv3.New(clientv3.Config{Endpoints: addr})
assert.Nil(t, err)
assert.NotNil(t, cli)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
testKey := "test-key"
rsp0, err := cli.Delete(ctx, testKey)
assert.Nil(t, err)
t.Logf("revision:%d", rsp0.Header.Revision)
rsp1, err := cli.Put(ctx, testKey, "value1")
assert.Nil(t, err)
t.Logf("revision:%d,value1", rsp1.Header.Revision)
rsp2, err := cli.Put(ctx, testKey, "value2")
assert.Nil(t, err)
t.Logf("revision:%d,value2", rsp2.Header.Revision)
rsp3, err := cli.Get(ctx, testKey, clientv3.WithRev(rsp1.Header.Revision))
assert.Nil(t, err)
t.Logf("get at revision:%d, value=%s", rsp1.Header.Revision, string(rsp3.Kvs[0].Value))
}
```
The output of above test program should look like this:
```text
=== RUN TestMVCC
etcd_mvcc_test.go:23: revision:401
etcd_mvcc_test.go:27: revision:402,value1
etcd_mvcc_test.go:31: revision:403,value2
etcd_mvcc_test.go:35: get at revision:402, value=value1
--- PASS: TestMVCC (0.01s)
```
In `etcd`, each writes operation would add `1` to `Revision`. So if we specify the `Revision` value at query, we can get the historical value under that `Revision`.
`metaSnapShot` is based on this feature of `etcd`. We will write an extra `Timestamp` on each write operation. `etcd`'s `Txn` makes sure that the `Timestamp` would have the same `Revision` with user data.
When querying, `metaSnapShot` will find an appropriate `Revision` based on the input `Timestamp`, and then query on `etcd` with this `Revision`.
In order to speed up getting `Revision` by `Timestamp`, `metaSnapShot` would maintain an array mapping the `Timestamp` to `Revision`. The default length of this array is `1024`, which is a type of circular array.
![snap_shot](./graphs/snapshot_2.png)
- `maxPos` points to the position where `Timestamp` and `Revision` are maximum.
- `minPos` points to the position where `Timestamp` and `Revision` are minimum.
- For each update operation, we first add `1` to `maxPos`. So the new `maxPos` would cover the old `minPos` position, and then add `1` to the old `minPos`
- From `0` to `maxPos` and from `minPos` to `1023`, which are two incremental arrays. We can use binary search to quickly get the `Revision` by the input `Timestamp`
- If the input `Timestamp` is greater than the `Timestamp` where the `maxPos` is located, then the `Revision` at the position of the `maxPos` will be returned
- If the input `Timestamp` is less than the `Timestamp` where `minPos` is located, `metaSnapshot` will load the historical `Timestamp` and `Revision` from `etcd` to find an appropriate `Revision` value.
The interface of `metaSnapShot` is defined as follows:
```go
type SnapShotKV interface {
Load(key string, ts typeutil.Timestamp) (string, error)
LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error)
Save(key, value string) (typeutil.Timestamp, error)
MultiSave(kvs map[string]string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error)
}
```
For the `Read` operations (`Load` and `LoadWithPrefix`), the input parameter `typeutil.Timestamp` is used to tell `metaSnapShot` to load the value based on that `Timestamp`.
For the `Write` operations (`Save`, `MultiSave`, `MultiSaveAndRemoveWithPrefix`), return values including `typeutil.Timestamp`, which is used to tell the caller when these write operations happened.
You might be curious about the parameter `additions` of `MultiSave` and `MultiSaveAndRemoveWithPrefix`: What does `additions` do, and why?
`additions` is an array of `func(ts typeutil.Timestamp) (string, string, error)`. So it's a function, receiving `typeutil.Timestamp` as an input, and returns two `string` which is `key-value` pair. If `error` is `nil` in the return value, `metaSnapShot` would write this `key-value` pair into `etcd`.
Referring to the document of `CreateCollection`, a timestamp is created for `Collection`, which is the timestamp when the `Collection`'s meta have been written into `etcd`, not the timestamp when `RootCoord` receives the request. So before writing the `Collection`'s meta into `etcd`, `metaSnapshot` would allocate a timestamp, and call all the `additions`. This would make sure the timestamp created for the `Collection` is correct.
![create_collection](./graphs/dml_create_collection.png)

View File

@ -11,8 +11,6 @@
#include "ValidationUtil.h"
#include "config/ServerConfig.h"
//#include "db/Constants.h"
//#include "db/Utils.h"
#include "knowhere/index/vector_index/ConfAdapter.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "utils/Log.h"

View File

@ -520,6 +520,7 @@ type mockETCDKV struct {
loadWithPrefix func(key string) ([]string, []string, error)
loadWithRevision func(key string) ([]string, []string, int64, error)
removeWithPrefix func(key string) error
walkWithPrefix func(prefix string, paginationSize int, fn func([]byte, []byte) error) error
}
func NewMockEtcdKV() *mockETCDKV {
@ -551,6 +552,9 @@ func NewMockEtcdKV() *mockETCDKV {
removeWithPrefix: func(key string) error {
return nil
},
walkWithPrefix: func(prefix string, paginationSize int, fn func([]byte, []byte) error) error {
return nil
},
}
}
@ -589,6 +593,10 @@ func NewMockEtcdKVWithReal(real kv.MetaKv) *mockETCDKV {
}
}
func (mk *mockETCDKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error {
return mk.walkWithPrefix(prefix, paginationSize, fn)
}
func (mk *mockETCDKV) Save(key string, value string) error {
return mk.save(key, value)
}

View File

@ -24,13 +24,12 @@ import (
"sync"
"time"
"github.com/milvus-io/milvus/internal/common"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
)
@ -86,6 +85,40 @@ func (kv *EmbedEtcdKV) GetPath(key string) string {
return path.Join(kv.rootPath, key)
}
func (kv *EmbedEtcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error {
prefix = path.Join(kv.rootPath, prefix)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
batch := int64(paginationSize)
opts := []clientv3.OpOption{
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
clientv3.WithLimit(batch),
clientv3.WithRange(clientv3.GetPrefixRangeEnd(prefix)),
}
key := prefix
for {
resp, err := kv.client.Get(ctx, key, opts...)
if err != nil {
return err
}
for _, kv := range resp.Kvs {
if err = fn(kv.Key, kv.Value); err != nil {
return err
}
}
if !resp.More {
break
}
// move to next key
key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
}
return nil
}
// LoadWithPrefix returns all the keys and values with the given key prefix
func (kv *EmbedEtcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
key = path.Join(kv.rootPath, key)

View File

@ -17,16 +17,20 @@
package etcdkv_test
import (
"errors"
"fmt"
"sort"
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
embed_etcd_kv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
"golang.org/x/exp/maps"
embed_etcd_kv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
func TestEmbedEtcd(te *testing.T) {
@ -954,4 +958,79 @@ func TestEmbedEtcd(te *testing.T) {
assert.Error(t, err)
}
})
te.Run("Etcd WalkWithPagination", func(t *testing.T) {
rootPath := "/etcd/test/root/walkWithPagination"
metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
assert.Nil(t, err)
defer metaKv.Close()
defer metaKv.RemoveWithPrefix("")
kvs := map[string]string{
"A/100": "v1",
"AA/100": "v2",
"AB/100": "v3",
"AB/2/100": "v4",
"B/100": "v5",
}
err = metaKv.MultiSave(kvs)
assert.NoError(t, err)
for k, v := range kvs {
actualV, err := metaKv.Load(k)
assert.NoError(t, err)
assert.Equal(t, v, actualV)
}
t.Run("apply function error ", func(t *testing.T) {
err = metaKv.WalkWithPrefix("A", 5, func(key []byte, value []byte) error {
return errors.New("error")
})
assert.Error(t, err)
})
t.Run("get with non-exist prefix ", func(t *testing.T) {
err = metaKv.WalkWithPrefix("non-exist-prefix", 5, func(key []byte, value []byte) error {
return nil
})
assert.NoError(t, err)
})
t.Run("with different pagination", func(t *testing.T) {
testFn := func(pagination int) {
expected := map[string]string{
"A/100": "v1",
"AA/100": "v2",
"AB/100": "v3",
"AB/2/100": "v4",
}
expectedSortedKey := maps.Keys(expected)
sort.Strings(expectedSortedKey)
ret := make(map[string]string)
actualSortedKey := make([]string, 0)
err = metaKv.WalkWithPrefix("A", pagination, func(key []byte, value []byte) error {
k := string(key)
k = k[len(rootPath)+1:]
ret[k] = string(value)
actualSortedKey = append(actualSortedKey, k)
return nil
})
assert.NoError(t, err)
assert.Equal(t, expected, ret, fmt.Errorf("pagination: %d", pagination))
assert.Equal(t, expectedSortedKey, actualSortedKey, fmt.Errorf("pagination: %d", pagination))
}
testFn(-100)
testFn(-1)
testFn(0)
testFn(1)
testFn(5)
testFn(100)
})
})
}

View File

@ -64,6 +64,43 @@ func (kv *EtcdKV) GetPath(key string) string {
return path.Join(kv.rootPath, key)
}
func (kv *EtcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error {
start := time.Now()
prefix = path.Join(kv.rootPath, prefix)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
batch := int64(paginationSize)
opts := []clientv3.OpOption{
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
clientv3.WithLimit(batch),
clientv3.WithRange(clientv3.GetPrefixRangeEnd(prefix)),
}
key := prefix
for {
resp, err := kv.client.Get(ctx, key, opts...)
if err != nil {
return err
}
for _, kv := range resp.Kvs {
if err = fn(kv.Key, kv.Value); err != nil {
return err
}
}
if !resp.More {
break
}
// move to next key
key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
}
CheckElapseAndWarn(start, "Slow etcd operation(WalkWithPagination)", zap.String("prefix", prefix))
return nil
}
// LoadWithPrefix returns all the keys and values with the given key prefix.
func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
start := time.Now()

View File

@ -17,18 +17,22 @@
package etcdkv_test
import (
"errors"
"fmt"
"os"
"sort"
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/funcutil"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
"golang.org/x/exp/maps"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
var Params paramtable.ComponentParam
@ -909,6 +913,91 @@ func TestEtcdKV_Load(te *testing.T) {
})
}
func Test_WalkWithPagination(t *testing.T) {
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
Params.EtcdCfg.Endpoints.GetAsStrings(),
Params.EtcdCfg.EtcdTLSCert.GetValue(),
Params.EtcdCfg.EtcdTLSKey.GetValue(),
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
defer etcdCli.Close()
assert.NoError(t, err)
rootPath := "/etcd/test/root/pagination"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
kvs := map[string]string{
"A/100": "v1",
"AA/100": "v2",
"AB/100": "v3",
"AB/2/100": "v4",
"B/100": "v5",
}
err = etcdKV.MultiSave(kvs)
assert.NoError(t, err)
for k, v := range kvs {
actualV, err := etcdKV.Load(k)
assert.NoError(t, err)
assert.Equal(t, v, actualV)
}
t.Run("apply function error ", func(t *testing.T) {
err = etcdKV.WalkWithPrefix("A", 5, func(key []byte, value []byte) error {
return errors.New("error")
})
assert.Error(t, err)
})
t.Run("get with non-exist prefix ", func(t *testing.T) {
err = etcdKV.WalkWithPrefix("non-exist-prefix", 5, func(key []byte, value []byte) error {
return nil
})
assert.NoError(t, err)
})
t.Run("with different pagination", func(t *testing.T) {
testFn := func(pagination int) {
expected := map[string]string{
"A/100": "v1",
"AA/100": "v2",
"AB/100": "v3",
"AB/2/100": "v4",
}
expectedSortedKey := maps.Keys(expected)
sort.Strings(expectedSortedKey)
ret := make(map[string]string)
actualSortedKey := make([]string, 0)
err = etcdKV.WalkWithPrefix("A", pagination, func(key []byte, value []byte) error {
k := string(key)
k = k[len(rootPath)+1:]
ret[k] = string(value)
actualSortedKey = append(actualSortedKey, k)
return nil
})
assert.NoError(t, err)
assert.Equal(t, expected, ret, fmt.Errorf("pagination: %d", pagination))
assert.Equal(t, expectedSortedKey, actualSortedKey, fmt.Errorf("pagination: %d", pagination))
}
testFn(-100)
testFn(-1)
testFn(0)
testFn(1)
testFn(5)
testFn(100)
})
}
func TestElapse(t *testing.T) {
start := time.Now()
isElapse := etcdkv.CheckElapseAndWarn(start, "err message")

View File

@ -17,8 +17,9 @@
package kv
import (
"github.com/milvus-io/milvus/internal/util/typeutil"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
// CompareFailedError is a helper type for checking MetaKv CompareAndSwap series func error type
@ -46,7 +47,6 @@ type BaseKV interface {
Remove(key string) error
MultiRemove(keys []string) error
RemoveWithPrefix(key string) error
Close()
}
@ -59,6 +59,7 @@ type TxnKV interface {
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error
}
//go:generate mockery --name=MetaKv --with-expecter
// MetaKv is TxnKV for metadata. It should save data with lease.
type MetaKv interface {
TxnKV
@ -76,6 +77,7 @@ type MetaKv interface {
KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error)
CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error)
CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error)
WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error
}
//go:generate mockery --name=SnapShotKV --with-expecter

View File

@ -20,9 +20,9 @@ import (
"strings"
"sync"
"github.com/milvus-io/milvus/internal/common"
"github.com/google/btree"
"github.com/milvus-io/milvus/internal/common"
)
// MemoryKV implements BaseKv interface and relies on underling btree.BTree.

1188
internal/kv/mocks/MetaKv.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
// Code generated by mockery v2.16.0. DO NOT EDIT.
package mocks
@ -44,8 +44,8 @@ type SnapShotKV_Load_Call struct {
}
// Load is a helper method to define mock.On call
// - key string
// - ts uint64
// - key string
// - ts uint64
func (_e *SnapShotKV_Expecter) Load(key interface{}, ts interface{}) *SnapShotKV_Load_Call {
return &SnapShotKV_Load_Call{Call: _e.mock.On("Load", key, ts)}
}
@ -100,8 +100,8 @@ type SnapShotKV_LoadWithPrefix_Call struct {
}
// LoadWithPrefix is a helper method to define mock.On call
// - key string
// - ts uint64
// - key string
// - ts uint64
func (_e *SnapShotKV_Expecter) LoadWithPrefix(key interface{}, ts interface{}) *SnapShotKV_LoadWithPrefix_Call {
return &SnapShotKV_LoadWithPrefix_Call{Call: _e.mock.On("LoadWithPrefix", key, ts)}
}
@ -138,8 +138,8 @@ type SnapShotKV_MultiSave_Call struct {
}
// MultiSave is a helper method to define mock.On call
// - kvs map[string]string
// - ts uint64
// - kvs map[string]string
// - ts uint64
func (_e *SnapShotKV_Expecter) MultiSave(kvs interface{}, ts interface{}) *SnapShotKV_MultiSave_Call {
return &SnapShotKV_MultiSave_Call{Call: _e.mock.On("MultiSave", kvs, ts)}
}
@ -176,9 +176,9 @@ type SnapShotKV_MultiSaveAndRemoveWithPrefix_Call struct {
}
// MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
// - ts uint64
// - saves map[string]string
// - removals []string
// - ts uint64
func (_e *SnapShotKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}, ts interface{}) *SnapShotKV_MultiSaveAndRemoveWithPrefix_Call {
return &SnapShotKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals, ts)}
}
@ -215,9 +215,9 @@ type SnapShotKV_Save_Call struct {
}
// Save is a helper method to define mock.On call
// - key string
// - value string
// - ts uint64
// - key string
// - value string
// - ts uint64
func (_e *SnapShotKV_Expecter) Save(key interface{}, value interface{}, ts interface{}) *SnapShotKV_Save_Call {
return &SnapShotKV_Save_Call{Call: _e.mock.On("Save", key, value, ts)}
}

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
// Code generated by mockery v2.16.0. DO NOT EDIT.
package mocks
@ -71,7 +71,7 @@ type TxnKV_Load_Call struct {
}
// Load is a helper method to define mock.On call
// - key string
// - key string
func (_e *TxnKV_Expecter) Load(key interface{}) *TxnKV_Load_Call {
return &TxnKV_Load_Call{Call: _e.mock.On("Load", key)}
}
@ -126,7 +126,7 @@ type TxnKV_LoadWithPrefix_Call struct {
}
// LoadWithPrefix is a helper method to define mock.On call
// - key string
// - key string
func (_e *TxnKV_Expecter) LoadWithPrefix(key interface{}) *TxnKV_LoadWithPrefix_Call {
return &TxnKV_LoadWithPrefix_Call{Call: _e.mock.On("LoadWithPrefix", key)}
}
@ -172,7 +172,7 @@ type TxnKV_MultiLoad_Call struct {
}
// MultiLoad is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *TxnKV_Expecter) MultiLoad(keys interface{}) *TxnKV_MultiLoad_Call {
return &TxnKV_MultiLoad_Call{Call: _e.mock.On("MultiLoad", keys)}
}
@ -209,7 +209,7 @@ type TxnKV_MultiRemove_Call struct {
}
// MultiRemove is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *TxnKV_Expecter) MultiRemove(keys interface{}) *TxnKV_MultiRemove_Call {
return &TxnKV_MultiRemove_Call{Call: _e.mock.On("MultiRemove", keys)}
}
@ -246,7 +246,7 @@ type TxnKV_MultiRemoveWithPrefix_Call struct {
}
// MultiRemoveWithPrefix is a helper method to define mock.On call
// - keys []string
// - keys []string
func (_e *TxnKV_Expecter) MultiRemoveWithPrefix(keys interface{}) *TxnKV_MultiRemoveWithPrefix_Call {
return &TxnKV_MultiRemoveWithPrefix_Call{Call: _e.mock.On("MultiRemoveWithPrefix", keys)}
}
@ -283,7 +283,7 @@ type TxnKV_MultiSave_Call struct {
}
// MultiSave is a helper method to define mock.On call
// - kvs map[string]string
// - kvs map[string]string
func (_e *TxnKV_Expecter) MultiSave(kvs interface{}) *TxnKV_MultiSave_Call {
return &TxnKV_MultiSave_Call{Call: _e.mock.On("MultiSave", kvs)}
}
@ -320,8 +320,8 @@ type TxnKV_MultiSaveAndRemove_Call struct {
}
// MultiSaveAndRemove is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
// - saves map[string]string
// - removals []string
func (_e *TxnKV_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}) *TxnKV_MultiSaveAndRemove_Call {
return &TxnKV_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", saves, removals)}
}
@ -358,8 +358,8 @@ type TxnKV_MultiSaveAndRemoveWithPrefix_Call struct {
}
// MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call
// - saves map[string]string
// - removals []string
// - saves map[string]string
// - removals []string
func (_e *TxnKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}) *TxnKV_MultiSaveAndRemoveWithPrefix_Call {
return &TxnKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals)}
}
@ -396,7 +396,7 @@ type TxnKV_Remove_Call struct {
}
// Remove is a helper method to define mock.On call
// - key string
// - key string
func (_e *TxnKV_Expecter) Remove(key interface{}) *TxnKV_Remove_Call {
return &TxnKV_Remove_Call{Call: _e.mock.On("Remove", key)}
}
@ -433,7 +433,7 @@ type TxnKV_RemoveWithPrefix_Call struct {
}
// RemoveWithPrefix is a helper method to define mock.On call
// - key string
// - key string
func (_e *TxnKV_Expecter) RemoveWithPrefix(key interface{}) *TxnKV_RemoveWithPrefix_Call {
return &TxnKV_RemoveWithPrefix_Call{Call: _e.mock.On("RemoveWithPrefix", key)}
}
@ -470,8 +470,8 @@ type TxnKV_Save_Call struct {
}
// Save is a helper method to define mock.On call
// - key string
// - value string
// - key string
// - value string
func (_e *TxnKV_Expecter) Save(key interface{}, value interface{}) *TxnKV_Save_Call {
return &TxnKV_Save_Call{Call: _e.mock.On("Save", key, value)}
}

View File

@ -1,410 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"fmt"
"path"
"strconv"
"sync"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/typeutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
const (
// RequestTimeout timeout for request
RequestTimeout = 10 * time.Second
)
type rtPair struct {
rev int64
ts typeutil.Timestamp
}
type MetaSnapshot struct {
cli *clientv3.Client
root string
tsKey string
lock sync.RWMutex
ts2Rev []rtPair
minPos int
maxPos int
numTs int
}
func NewMetaSnapshot(cli *clientv3.Client, root, tsKey string, bufSize int) (*MetaSnapshot, error) {
if bufSize <= 0 {
bufSize = 1024
}
ms := &MetaSnapshot{
cli: cli,
root: root,
tsKey: tsKey,
lock: sync.RWMutex{},
ts2Rev: make([]rtPair, bufSize),
minPos: 0,
maxPos: 0,
numTs: 0,
}
if err := ms.loadTs(); err != nil {
return nil, err
}
return ms, nil
}
func (ms *MetaSnapshot) loadTs() error {
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
key := path.Join(ms.root, ms.tsKey)
resp, err := ms.cli.Get(ctx, key)
if err != nil {
return err
}
if len(resp.Kvs) <= 0 {
return nil
}
version := resp.Kvs[0].Version
revision := resp.Kvs[0].ModRevision
createRevision := resp.Kvs[0].CreateRevision
strTs := string(resp.Kvs[0].Value)
ts, err := strconv.ParseUint(strTs, 10, 64)
if err != nil {
return err
}
log.Info("Load last ts", zap.Int64("version", version), zap.Int64("revision", revision))
ms.initTs(revision, ts)
// start from revision-1, until equals to create revision
for revision--; revision >= createRevision; revision-- {
if ms.numTs == len(ms.ts2Rev) {
break
}
resp, err = ms.cli.Get(ctx, key, clientv3.WithRev(revision))
if err != nil {
return err
}
if len(resp.Kvs) <= 0 {
return nil
}
curVer := resp.Kvs[0].Version
curRev := resp.Kvs[0].ModRevision
if curVer > version {
log.Warn("version go backwards", zap.Int64("curVer", curVer), zap.Int64("version", version))
return nil
}
if curVer == version {
log.Debug("Snapshot found save version with different revision", zap.Int64("revision", revision), zap.Int64("version", version))
}
strTs := string(resp.Kvs[0].Value)
if strTs == "0" {
//#issue 7150, index building inserted "0", skipping
//this is a special fix for backward compatibility, the previous version will put 0 ts into the Snapshot building index
continue
}
curTs, err := strconv.ParseUint(strTs, 10, 64)
if err != nil {
return err
}
if curTs >= ts {
return fmt.Errorf("timestamp go back, curTs=%d,ts=%d", curTs, ts)
}
ms.initTs(curRev, curTs)
ts = curTs
revision = curRev
version = curVer
}
return nil
}
func (ms *MetaSnapshot) maxTs() typeutil.Timestamp {
return ms.ts2Rev[ms.maxPos].ts
}
func (ms *MetaSnapshot) minTs() typeutil.Timestamp {
return ms.ts2Rev[ms.minPos].ts
}
func (ms *MetaSnapshot) initTs(rev int64, ts typeutil.Timestamp) {
log.Debug("init meta Snapshot ts", zap.Int64("rev", rev), zap.Uint64("ts", ts))
if ms.numTs == 0 {
ms.maxPos = len(ms.ts2Rev) - 1
ms.minPos = len(ms.ts2Rev) - 1
ms.numTs = 1
ms.ts2Rev[ms.maxPos].rev = rev
ms.ts2Rev[ms.maxPos].ts = ts
} else if ms.numTs < len(ms.ts2Rev) {
ms.minPos--
ms.numTs++
ms.ts2Rev[ms.minPos].rev = rev
ms.ts2Rev[ms.minPos].ts = ts
}
}
func (ms *MetaSnapshot) putTs(rev int64, ts typeutil.Timestamp) {
log.Debug("put meta snapshto ts", zap.Int64("rev", rev), zap.Uint64("ts", ts))
ms.maxPos++
if ms.maxPos == len(ms.ts2Rev) {
ms.maxPos = 0
}
ms.ts2Rev[ms.maxPos].rev = rev
ms.ts2Rev[ms.maxPos].ts = ts
if ms.numTs < len(ms.ts2Rev) {
ms.numTs++
} else {
ms.minPos++
if ms.minPos == len(ms.ts2Rev) {
ms.minPos = 0
}
}
}
func (ms *MetaSnapshot) searchOnCache(ts typeutil.Timestamp, start, length int) int64 {
if length == 1 {
return ms.ts2Rev[start].rev
}
begin := start
end := begin + length
mid := (begin + end) / 2
for {
if ms.ts2Rev[mid].ts == ts {
return ms.ts2Rev[mid].rev
}
if mid == begin {
if ms.ts2Rev[mid].ts < ts || mid == start {
return ms.ts2Rev[mid].rev
}
return ms.ts2Rev[mid-1].rev
}
if ms.ts2Rev[mid].ts > ts {
end = mid
} else if ms.ts2Rev[mid].ts < ts {
begin = mid + 1
}
mid = (begin + end) / 2
}
}
func (ms *MetaSnapshot) getRevOnCache(ts typeutil.Timestamp) int64 {
if ms.numTs == 0 {
return 0
}
if ts >= ms.ts2Rev[ms.maxPos].ts {
return ms.ts2Rev[ms.maxPos].rev
}
if ts < ms.ts2Rev[ms.minPos].ts {
return 0
}
if ms.maxPos > ms.minPos {
return ms.searchOnCache(ts, ms.minPos, ms.maxPos-ms.minPos+1)
}
topVal := ms.ts2Rev[len(ms.ts2Rev)-1]
botVal := ms.ts2Rev[0]
minVal := ms.ts2Rev[ms.minPos]
maxVal := ms.ts2Rev[ms.maxPos]
if ts >= topVal.ts && ts < botVal.ts {
return topVal.rev
} else if ts >= minVal.ts && ts < topVal.ts {
return ms.searchOnCache(ts, ms.minPos, len(ms.ts2Rev)-ms.minPos)
} else if ts >= botVal.ts && ts < maxVal.ts {
return ms.searchOnCache(ts, 0, ms.maxPos+1)
}
return 0
}
func (ms *MetaSnapshot) getRevOnEtcd(ts typeutil.Timestamp, rev int64) int64 {
if rev < 2 {
return 0
}
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
for rev--; rev >= 2; rev-- {
resp, err := ms.cli.Get(ctx, path.Join(ms.root, ms.tsKey), clientv3.WithRev(rev))
if err != nil {
log.Debug("get ts from etcd failed", zap.Error(err))
return 0
}
if len(resp.Kvs) <= 0 {
return 0
}
rev = resp.Kvs[0].ModRevision
curTs, err := strconv.ParseUint(string(resp.Kvs[0].Value), 10, 64)
if err != nil {
log.Debug("parse timestam error", zap.String("input", string(resp.Kvs[0].Value)), zap.Error(err))
return 0
}
if curTs <= ts {
return rev
}
}
return 0
}
func (ms *MetaSnapshot) getRev(ts typeutil.Timestamp) (int64, error) {
rev := ms.getRevOnCache(ts)
if rev > 0 {
return rev, nil
}
rev = ms.ts2Rev[ms.minPos].rev
rev = ms.getRevOnEtcd(ts, rev)
if rev > 0 {
return rev, nil
}
return 0, fmt.Errorf("can't find revision on ts=%d", ts)
}
func (ms *MetaSnapshot) Save(key, value string, ts typeutil.Timestamp) error {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
strTs := strconv.FormatInt(int64(ts), 10)
resp, err := ms.cli.Txn(ctx).If().Then(
clientv3.OpPut(path.Join(ms.root, key), value),
clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs),
).Commit()
if err != nil {
return err
}
ms.putTs(resp.Header.Revision, ts)
return nil
}
func (ms *MetaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) {
ms.lock.RLock()
defer ms.lock.RUnlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
var resp *clientv3.GetResponse
var err error
var rev int64
if ts == 0 {
resp, err = ms.cli.Get(ctx, path.Join(ms.root, key))
if err != nil {
return "", err
}
} else {
rev, err = ms.getRev(ts)
if err != nil {
return "", err
}
resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithRev(rev))
if err != nil {
return "", err
}
}
if len(resp.Kvs) == 0 {
return "", fmt.Errorf("there is no value on key = %s, ts = %d", key, ts)
}
return string(resp.Kvs[0].Value), nil
}
func (ms *MetaSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
ops := make([]clientv3.Op, 0, len(kvs)+1)
for key, value := range kvs {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value))
}
strTs := strconv.FormatInt(int64(ts), 10)
ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs))
resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit()
if err != nil {
return err
}
ms.putTs(resp.Header.Revision, ts)
return nil
}
func (ms *MetaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
ms.lock.RLock()
defer ms.lock.RUnlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
var resp *clientv3.GetResponse
var err error
var rev int64
if ts == 0 {
resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, err
}
} else {
rev, err = ms.getRev(ts)
if err != nil {
return nil, nil, err
}
resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithPrefix(), clientv3.WithRev(rev), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, err
}
}
keys := make([]string, 0, len(resp.Kvs))
values := make([]string, 0, len(resp.Kvs))
tk := path.Join(ms.root, "k")
prefixLen := len(tk) - 1
for _, kv := range resp.Kvs {
tk = string(kv.Key)
tk = tk[prefixLen:]
keys = append(keys, tk)
values = append(values, string(kv.Value))
}
return keys, values, nil
}
func (ms *MetaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
ops := make([]clientv3.Op, 0, len(saves)+len(removals)+1)
for key, value := range saves {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value))
}
strTs := strconv.FormatInt(int64(ts), 10)
for _, key := range removals {
ops = append(ops, clientv3.OpDelete(path.Join(ms.root, key), clientv3.WithPrefix()))
}
ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs))
resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit()
if err != nil {
return err
}
ms.putTs(resp.Header.Revision, ts)
return nil
}

View File

@ -1,519 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"fmt"
"math/rand"
"os"
"path"
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
)
var Params paramtable.ComponentParam
func TestMain(m *testing.M) {
Params.Init()
code := m.Run()
os.Exit(code)
}
func TestMetaSnapshot(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
Params.EtcdCfg.Endpoints.GetAsStrings(),
Params.EtcdCfg.EtcdTLSCert.GetValue(),
Params.EtcdCfg.EtcdTLSKey.GetValue(),
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
assert.Nil(t, err)
defer etcdCli.Close()
var vtso typeutil.Timestamp
ftso := func() typeutil.Timestamp {
return vtso
}
ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 4)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 8; i++ {
vtso = typeutil.Timestamp(100 + i)
ts := ftso()
err = ms.Save("abc", fmt.Sprintf("value-%d", i), ts)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
_, err = etcdCli.Put(context.Background(), "other", fmt.Sprintf("other-%d", i))
assert.Nil(t, err)
}
ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 4)
assert.Nil(t, err)
assert.NotNil(t, ms)
}
func TestSearchOnCache(t *testing.T) {
ms := &MetaSnapshot{}
for i := 0; i < 8; i++ {
ms.ts2Rev = append(ms.ts2Rev,
rtPair{
rev: int64(i * 2),
ts: typeutil.Timestamp(i * 2),
})
}
rev := ms.searchOnCache(9, 0, 8)
assert.Equal(t, int64(8), rev)
rev = ms.searchOnCache(1, 0, 2)
assert.Equal(t, int64(0), rev)
rev = ms.searchOnCache(1, 0, 8)
assert.Equal(t, int64(0), rev)
rev = ms.searchOnCache(14, 0, 8)
assert.Equal(t, int64(14), rev)
rev = ms.searchOnCache(0, 0, 8)
assert.Equal(t, int64(0), rev)
}
func TestGetRevOnCache(t *testing.T) {
ms := &MetaSnapshot{}
ms.ts2Rev = make([]rtPair, 7)
ms.initTs(7, 16)
ms.initTs(6, 14)
ms.initTs(5, 12)
ms.initTs(4, 10)
var rev int64
rev = ms.getRevOnCache(17)
assert.Equal(t, int64(7), rev)
rev = ms.getRevOnCache(9)
assert.Equal(t, int64(0), rev)
rev = ms.getRevOnCache(10)
assert.Equal(t, int64(4), rev)
rev = ms.getRevOnCache(16)
assert.Equal(t, int64(7), rev)
rev = ms.getRevOnCache(15)
assert.Equal(t, int64(6), rev)
rev = ms.getRevOnCache(12)
assert.Equal(t, int64(5), rev)
ms.initTs(3, 8)
ms.initTs(2, 6)
assert.Equal(t, ms.maxPos, 6)
assert.Equal(t, ms.minPos, 1)
rev = ms.getRevOnCache(17)
assert.Equal(t, int64(7), rev)
rev = ms.getRevOnCache(9)
assert.Equal(t, int64(3), rev)
rev = ms.getRevOnCache(10)
assert.Equal(t, int64(4), rev)
rev = ms.getRevOnCache(16)
assert.Equal(t, int64(7), rev)
rev = ms.getRevOnCache(15)
assert.Equal(t, int64(6), rev)
rev = ms.getRevOnCache(12)
assert.Equal(t, int64(5), rev)
rev = ms.getRevOnCache(5)
assert.Equal(t, int64(0), rev)
ms.putTs(8, 18)
assert.Equal(t, ms.maxPos, 0)
assert.Equal(t, ms.minPos, 1)
for rev = 2; rev <= 7; rev++ {
ts := ms.getRevOnCache(typeutil.Timestamp(rev*2 + 3))
assert.Equal(t, rev, ts)
}
ms.putTs(9, 20)
assert.Equal(t, ms.maxPos, 1)
assert.Equal(t, ms.minPos, 2)
assert.Equal(t, ms.numTs, 7)
curMax := ms.maxPos
curMin := ms.minPos
for i := 10; i < 20; i++ {
ms.putTs(int64(i), typeutil.Timestamp(i*2+2))
curMax++
curMin++
if curMax == len(ms.ts2Rev) {
curMax = 0
}
if curMin == len(ms.ts2Rev) {
curMin = 0
}
assert.Equal(t, curMax, ms.maxPos)
assert.Equal(t, curMin, ms.minPos)
}
for i := 13; i < 20; i++ {
rev = ms.getRevOnCache(typeutil.Timestamp(i*2 + 2))
assert.Equal(t, int64(i), rev)
rev = ms.getRevOnCache(typeutil.Timestamp(i*2 + 3))
assert.Equal(t, int64(i), rev)
}
rev = ms.getRevOnCache(27)
assert.Zero(t, rev)
}
func TestGetRevOnEtcd(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
key := path.Join(rootPath, tsKey)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
Params.EtcdCfg.Endpoints.GetAsStrings(),
Params.EtcdCfg.EtcdTLSCert.GetValue(),
Params.EtcdCfg.EtcdTLSKey.GetValue(),
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
assert.Nil(t, err)
defer etcdCli.Close()
ms := MetaSnapshot{
cli: etcdCli,
root: rootPath,
tsKey: tsKey,
}
resp, err := etcdCli.Put(ctx, key, "100")
assert.Nil(t, err)
revList := []int64{}
tsList := []typeutil.Timestamp{}
revList = append(revList, resp.Header.Revision)
tsList = append(tsList, 100)
for i := 110; i < 200; i += 10 {
resp, err = etcdCli.Put(ctx, key, fmt.Sprintf("%d", i))
assert.Nil(t, err)
revList = append(revList, resp.Header.Revision)
tsList = append(tsList, typeutil.Timestamp(i))
}
lastRev := revList[len(revList)-1] + 1
for i, ts := range tsList {
rev := ms.getRevOnEtcd(ts, lastRev)
assert.Equal(t, revList[i], rev)
}
for i := 0; i < len(tsList); i++ {
rev := ms.getRevOnEtcd(tsList[i]+5, lastRev)
assert.Equal(t, revList[i], rev)
}
rev := ms.getRevOnEtcd(200, lastRev)
assert.Equal(t, lastRev-1, rev)
rev = ms.getRevOnEtcd(99, lastRev)
assert.Zero(t, rev)
}
func TestLoad(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
Params.EtcdCfg.Endpoints.GetAsStrings(),
Params.EtcdCfg.EtcdTLSCert.GetValue(),
Params.EtcdCfg.EtcdTLSKey.GetValue(),
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
assert.Nil(t, err)
defer etcdCli.Close()
var vtso typeutil.Timestamp
ftso := func() typeutil.Timestamp {
return vtso
}
ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 7)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
vtso = typeutil.Timestamp(100 + i*5)
ts := ftso()
err = ms.Save("key", fmt.Sprintf("value-%d", i), ts)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
for i := 0; i < 20; i++ {
val, err := ms.Load("key", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, val, fmt.Sprintf("value-%d", i))
}
val, err := ms.Load("key", 0)
assert.Nil(t, err)
assert.Equal(t, "value-19", val)
ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 11)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
val, err := ms.Load("key", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, val, fmt.Sprintf("value-%d", i))
}
}
func TestMultiSave(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
Params.EtcdCfg.Endpoints.GetAsStrings(),
Params.EtcdCfg.EtcdTLSCert.GetValue(),
Params.EtcdCfg.EtcdTLSKey.GetValue(),
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
assert.Nil(t, err)
defer etcdCli.Close()
var vtso typeutil.Timestamp
ftso := func() typeutil.Timestamp {
return vtso
}
ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 7)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
saves := map[string]string{"k1": fmt.Sprintf("v1-%d", i), "k2": fmt.Sprintf("v2-%d", i)}
vtso = typeutil.Timestamp(100 + i*5)
ts := ftso()
err = ms.MultiSave(saves, ts)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
for i := 0; i < 20; i++ {
keys, vals, err := ms.LoadWithPrefix("k", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, len(keys), len(vals))
assert.Equal(t, len(keys), 2)
assert.Equal(t, keys[0], "k1")
assert.Equal(t, keys[1], "k2")
assert.Equal(t, vals[0], fmt.Sprintf("v1-%d", i))
assert.Equal(t, vals[1], fmt.Sprintf("v2-%d", i))
}
keys, vals, err := ms.LoadWithPrefix("k", 0)
assert.Nil(t, err)
assert.Equal(t, len(keys), len(vals))
assert.Equal(t, len(keys), 2)
assert.Equal(t, keys[0], "k1")
assert.Equal(t, keys[1], "k2")
assert.Equal(t, vals[0], "v1-19")
assert.Equal(t, vals[1], "v2-19")
ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 11)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
keys, vals, err := ms.LoadWithPrefix("k", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, len(keys), len(vals))
assert.Equal(t, len(keys), 2)
assert.Equal(t, keys[0], "k1")
assert.Equal(t, keys[1], "k2")
assert.Equal(t, vals[0], fmt.Sprintf("v1-%d", i))
assert.Equal(t, vals[1], fmt.Sprintf("v2-%d", i))
}
}
func TestMultiSaveAndRemoveWithPrefix(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
Params.EtcdCfg.Endpoints.GetAsStrings(),
Params.EtcdCfg.EtcdTLSCert.GetValue(),
Params.EtcdCfg.EtcdTLSKey.GetValue(),
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
assert.Nil(t, err)
defer etcdCli.Close()
var vtso typeutil.Timestamp
ftso := func() typeutil.Timestamp {
return vtso
}
defer etcdCli.Close()
ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 7)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
vtso = typeutil.Timestamp(100 + i*5)
ts := ftso()
err = ms.Save(fmt.Sprintf("kd-%04d", i), fmt.Sprintf("value-%d", i), ts)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
for i := 20; i < 40; i++ {
sm := map[string]string{"ks": fmt.Sprintf("value-%d", i)}
dm := []string{fmt.Sprintf("kd-%04d", i-20)}
vtso = typeutil.Timestamp(100 + i*5)
ts := ftso()
err = ms.MultiSaveAndRemoveWithPrefix(sm, dm, ts)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
for i := 0; i < 20; i++ {
val, err := ms.Load(fmt.Sprintf("kd-%04d", i), typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("value-%d", i), val)
_, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, i+1, len(vals))
}
for i := 20; i < 40; i++ {
val, err := ms.Load("ks", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("value-%d", i), val)
_, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, 39-i, len(vals))
}
ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 11)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
val, err := ms.Load(fmt.Sprintf("kd-%04d", i), typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("value-%d", i), val)
_, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, i+1, len(vals))
}
for i := 20; i < 40; i++ {
val, err := ms.Load("ks", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("value-%d", i), val)
_, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, 39-i, len(vals))
}
}
func TestTsBackward(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
Params.EtcdCfg.Endpoints.GetAsStrings(),
Params.EtcdCfg.EtcdTLSCert.GetValue(),
Params.EtcdCfg.EtcdTLSKey.GetValue(),
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
assert.Nil(t, err)
defer etcdCli.Close()
kv, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024)
assert.Nil(t, err)
err = kv.loadTs()
assert.Nil(t, err)
kv.Save("a", "b", 100)
kv.Save("a", "c", 99) // backward
kv.Save("a", "d", 200)
kv, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024)
assert.Error(t, err)
}
func TestFix7150(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
Params.EtcdCfg.Endpoints.GetAsStrings(),
Params.EtcdCfg.EtcdTLSCert.GetValue(),
Params.EtcdCfg.EtcdTLSKey.GetValue(),
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
assert.Nil(t, err)
defer etcdCli.Close()
kv, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024)
assert.Nil(t, err)
err = kv.loadTs()
assert.Nil(t, err)
kv.Save("a", "b", 100)
kv.Save("a", "c", 0) // bug introduced
kv.Save("a", "d", 200)
kv, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024)
assert.Nil(t, err)
err = kv.loadTs()
assert.Nil(t, err)
}

View File

@ -26,19 +26,24 @@ import (
"strconv"
"strings"
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)
var (
// SuffixSnapshotTombstone special value for tombstone mark
SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC}
PaginationSize = 5000
)
// IsTombstone used in migration tool also.
@ -56,7 +61,7 @@ func ConstructTombstone() []byte {
// SuffixSnapshot record timestamp as prefix of a key under the Snapshot prefix path
type SuffixSnapshot struct {
// internal kv which SuffixSnapshot based on
kv.TxnKV
kv.MetaKv
// rw mutex provided range lock
sync.RWMutex
// lastestTS latest timestamp for each key
@ -79,6 +84,8 @@ type SuffixSnapshot struct {
// exp is the shortcut format checker for ts-key
// composed with separator only
exp *regexp.Regexp
closeGC chan struct{}
}
// tsv struct stores kv with timestamp
@ -91,9 +98,9 @@ type tsv struct {
var _ kv.SnapShotKV = (*SuffixSnapshot)(nil)
// NewSuffixSnapshot creates a NewSuffixSnapshot with provided kv
func NewSuffixSnapshot(txnKV kv.TxnKV, sep, root, snapshot string) (*SuffixSnapshot, error) {
if txnKV == nil {
return nil, retry.Unrecoverable(errors.New("txnKV is nil"))
func NewSuffixSnapshot(metaKV kv.MetaKv, sep, root, snapshot string) (*SuffixSnapshot, error) {
if metaKV == nil {
return nil, retry.Unrecoverable(errors.New("MetaKv is nil"))
}
// handles trailing / logic
@ -104,8 +111,8 @@ func NewSuffixSnapshot(txnKV kv.TxnKV, sep, root, snapshot string) (*SuffixSnaps
tk = path.Join(root, "k")
rootLen := len(tk) - 1
return &SuffixSnapshot{
TxnKV: txnKV,
ss := &SuffixSnapshot{
MetaKv: metaKV,
lastestTS: make(map[string]typeutil.Timestamp),
separator: sep,
exp: regexp.MustCompile(fmt.Sprintf(`^(.+)%s(\d+)$`, sep)),
@ -113,7 +120,10 @@ func NewSuffixSnapshot(txnKV kv.TxnKV, sep, root, snapshot string) (*SuffixSnaps
snapshotLen: snapshotLen,
rootPrefix: root,
rootLen: rootLen,
}, nil
closeGC: make(chan struct{}, 1),
}
go ss.startBackgroundGC()
return ss, nil
}
// isTombstone helper function to check whether is tombstone mark
@ -200,9 +210,9 @@ func (ss *SuffixSnapshot) checkKeyTS(key string, ts typeutil.Timestamp) (bool, e
// loadLatestTS load the loatest ts for specified key
func (ss *SuffixSnapshot) loadLatestTS(key string) error {
prefix := ss.composeSnapshotPrefix(key)
keys, _, err := ss.TxnKV.LoadWithPrefix(prefix)
keys, _, err := ss.MetaKv.LoadWithPrefix(prefix)
if err != nil {
log.Warn("SuffixSnapshot txnkv LoadWithPrefix failed", zap.String("key", key),
log.Warn("SuffixSnapshot MetaKv LoadWithPrefix failed", zap.String("key", key),
zap.Error(err))
return err
}
@ -259,14 +269,14 @@ func binarySearchRecords(records []tsv, ts typeutil.Timestamp) (string, bool) {
}
// Save stores key-value pairs with timestamp
// if ts is 0, SuffixSnapshot works as a TxnKV
// if ts is 0, SuffixSnapshot works as a MetaKv
// otherwise, SuffixSnapshot will store a ts-key as "key[sep]ts"-value pair in snapshot path
// and for acceleration store original key-value if ts is the latest
func (ss *SuffixSnapshot) Save(key string, value string, ts typeutil.Timestamp) error {
// if ts == 0, act like TxnKv
// if ts == 0, act like MetaKv
// will not update lastestTs since ts not not valid
if ts == 0 {
return ss.TxnKV.Save(key, value)
return ss.MetaKv.Save(key, value)
}
ss.Lock()
@ -281,7 +291,7 @@ func (ss *SuffixSnapshot) Save(key string, value string, ts typeutil.Timestamp)
return err
}
if after {
err := ss.TxnKV.MultiSave(map[string]string{
err := ss.MetaKv.MultiSave(map[string]string{
key: value,
tsKey: value,
})
@ -293,14 +303,14 @@ func (ss *SuffixSnapshot) Save(key string, value string, ts typeutil.Timestamp)
}
// modifying history key, just save tskey-value
return ss.TxnKV.Save(tsKey, value)
return ss.MetaKv.Save(tsKey, value)
}
func (ss *SuffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) {
// if ts == 0, load latest by definition
// and with acceleration logic, just do load key will do
if ts == 0 {
value, err := ss.TxnKV.Load(key)
value, err := ss.MetaKv.Load(key)
if ss.isTombstone(value) {
return "", errors.New("no value found")
}
@ -318,7 +328,7 @@ func (ss *SuffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error
return "", err
}
if after {
value, err := ss.TxnKV.Load(key)
value, err := ss.MetaKv.Load(key)
if ss.isTombstone(value) {
return "", errors.New("no value found")
}
@ -327,9 +337,9 @@ func (ss *SuffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error
// before ts, do time travel
// 1. load all tsKey with key/ prefix
keys, values, err := ss.TxnKV.LoadWithPrefix(ss.composeSnapshotPrefix(key))
keys, values, err := ss.MetaKv.LoadWithPrefix(ss.composeSnapshotPrefix(key))
if err != nil {
log.Warn("prefixSnapshot txnKV LoadWithPrefix failed", zap.String("key", key), zap.Error(err))
log.Warn("prefixSnapshot MetaKv LoadWithPrefix failed", zap.String("key", key), zap.Error(err))
return "", err
}
@ -367,12 +377,12 @@ func (ss *SuffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error
}
// MultiSave save multiple kvs
// if ts == 0, act like TxnKV
// if ts == 0, act like MetaKv
// each key-value will be treated using same logic like Save
func (ss *SuffixSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error {
// if ts == 0, act like TxnKV
// if ts == 0, act like MetaKv
if ts == 0 {
return ss.TxnKV.MultiSave(kvs)
return ss.MetaKv.MultiSave(kvs)
}
ss.Lock()
defer ss.Unlock()
@ -385,7 +395,7 @@ func (ss *SuffixSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp
}
// multi save execute map; if succeeds, update ts in the update list
err = ss.TxnKV.MultiSave(execute)
err = ss.MetaKv.MultiSave(execute)
if err == nil {
for _, key := range updateList {
ss.lastestTS[key] = ts
@ -424,7 +434,7 @@ func (ss *SuffixSnapshot) generateSaveExecute(kvs map[string]string, ts typeutil
func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
// ts 0 case shall be treated as fetch latest/current value
if ts == 0 {
keys, values, err := ss.TxnKV.LoadWithPrefix(key)
keys, values, err := ss.MetaKv.LoadWithPrefix(key)
fks := keys[:0] //make([]string, 0, len(keys))
fvs := values[:0] //make([]string, 0, len(values))
// hide rootPrefix from return value
@ -441,7 +451,7 @@ func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]s
ss.Lock()
defer ss.Unlock()
keys, values, err := ss.TxnKV.LoadWithPrefix(key)
keys, values, err := ss.MetaKv.LoadWithPrefix(key)
if err != nil {
return nil, nil, err
}
@ -456,7 +466,7 @@ func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]s
for i, key := range keys {
group := kvgroup{key: key, value: values[i]}
// load prefix keys contains rootPrefix
sKeys, sValues, err := ss.TxnKV.LoadWithPrefix(ss.composeSnapshotPrefix(ss.hideRootPrefix(key)))
sKeys, sValues, err := ss.MetaKv.LoadWithPrefix(ss.composeSnapshotPrefix(ss.hideRootPrefix(key)))
if err != nil {
return nil, nil, err
}
@ -500,12 +510,12 @@ func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]s
}
// MultiSaveAndRemoveWithPrefix save muiltple kvs and remove as well
// if ts == 0, act like TxnKV
// if ts == 0, act like MetaKv
// each key-value will be treated in same logic like Save
func (ss *SuffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
// if ts == 0, act like TxnKV
// if ts == 0, act like MetaKv
if ts == 0 {
return ss.TxnKV.MultiSaveAndRemoveWithPrefix(saves, removals)
return ss.MetaKv.MultiSaveAndRemoveWithPrefix(saves, removals)
}
ss.Lock()
defer ss.Unlock()
@ -519,9 +529,9 @@ func (ss *SuffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string,
// load each removal, change execution to adding tombstones
for _, removal := range removals {
keys, _, err := ss.TxnKV.LoadWithPrefix(removal)
keys, _, err := ss.MetaKv.LoadWithPrefix(removal)
if err != nil {
log.Warn("SuffixSnapshot TxnKV LoadwithPrefix failed", zap.String("key", removal), zap.Error(err))
log.Warn("SuffixSnapshot MetaKv LoadwithPrefix failed", zap.String("key", removal), zap.Error(err))
return err
}
@ -535,7 +545,7 @@ func (ss *SuffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string,
}
// multi save execute map; if succeeds, update ts in the update list
err = ss.TxnKV.MultiSave(execute)
err = ss.MetaKv.MultiSave(execute)
if err == nil {
for _, key := range updateList {
ss.lastestTS[key] = ts
@ -543,3 +553,123 @@ func (ss *SuffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string,
}
return err
}
func (ss *SuffixSnapshot) Close() {
close(ss.closeGC)
}
// startBackgroundGC the data will clean up if key ts!=0 and expired
func (ss *SuffixSnapshot) startBackgroundGC() {
log.Debug("suffix snapshot GC goroutine start!")
ticker := time.NewTicker(60 * time.Minute)
defer ticker.Stop()
params := paramtable.Get()
retentionDuration := params.CommonCfg.RetentionDuration.GetAsDuration(time.Second)
for {
select {
case <-ss.closeGC:
log.Warn("quit suffix snapshot GC goroutine!")
return
case now := <-ticker.C:
err := ss.removeExpiredKvs(now, retentionDuration)
if err != nil {
log.Warn("remove expired data fail during GC", zap.Error(err))
}
}
}
}
func (ss *SuffixSnapshot) getOriginalKey(snapshotKey string) (string, error) {
if !strings.HasPrefix(snapshotKey, ss.snapshotPrefix) {
return "", fmt.Errorf("get original key failed, invailed snapshot key:%s", snapshotKey)
}
// collect keys that parent node is snapshot node if the corresponding the latest ts is expired.
idx := strings.LastIndex(snapshotKey, ss.separator)
if idx == -1 {
return "", fmt.Errorf("get original key failed, snapshot key:%s", snapshotKey)
}
prefix := snapshotKey[:idx]
return prefix[ss.snapshotLen:], nil
}
func (ss *SuffixSnapshot) batchRemoveExpiredKvs(keyGroup []string, originalKey string, includeOriginalKey bool) error {
if includeOriginalKey {
keyGroup = append(keyGroup, originalKey)
}
// to protect txn finished with ascend order, reverse the latest kv with tombstone to tail of array
sort.Strings(keyGroup)
removeFn := func(partialKeys []string) error {
return ss.MetaKv.MultiRemove(keyGroup)
}
return etcd.RemoveByBatch(keyGroup, removeFn)
}
func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time, retentionDuration time.Duration) error {
keyGroup := make([]string, 0)
latestOriginalKey := ""
latestValue := ""
groupCnt := 0
removeFn := func(curOriginalKey string) error {
if !ss.isTombstone(latestValue) {
return nil
}
return ss.batchRemoveExpiredKvs(keyGroup, curOriginalKey, groupCnt == len(keyGroup))
}
// walk all kvs with SortAsc, we need walk to the latest key for each key group to check the kv
// whether contains tombstone, then if so, it represents the original key has been removed.
// TODO: walk with Desc
err := ss.MetaKv.WalkWithPrefix(ss.snapshotPrefix, PaginationSize, func(k []byte, v []byte) error {
key := string(k)
value := string(v)
key = ss.hideRootPrefix(key)
ts, ok := ss.isTSKey(key)
// it is original key if the key doesn't contain ts
if !ok {
log.Warn("skip key because it doesn't contain ts", zap.String("key", key))
return nil
}
curOriginalKey, err := ss.getOriginalKey(key)
if err != nil {
return err
}
// reset if starting look up a new key group
if latestOriginalKey != "" && latestOriginalKey != curOriginalKey {
// it indicates all keys need to remove that the prefix is original key
// it means the latest original kvs has already been removed if the latest kv has tombstone marker.
if err := removeFn(latestOriginalKey); err != nil {
return err
}
keyGroup = make([]string, 0)
groupCnt = 0
}
latestValue = value
groupCnt++
latestOriginalKey = curOriginalKey
// record keys if the kv is expired
pts, _ := tsoutil.ParseTS(ts)
expireTime := pts.Add(retentionDuration)
// break loop if it reaches expire time
if expireTime.Before(now) {
keyGroup = append(keyGroup, key)
}
return nil
})
if err != nil {
return err
}
return removeFn(latestOriginalKey)
}

View File

@ -17,22 +17,37 @@
package rootcoord
import (
"errors"
"fmt"
"math/rand"
"os"
"testing"
"time"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
var (
snapshotPrefix = "snapshots"
)
var Params = paramtable.Get()
func TestMain(m *testing.M) {
Params.Init()
code := m.Run()
os.Exit(code)
}
func Test_binarySearchRecords(t *testing.T) {
type testcase struct {
records []tsv
@ -173,6 +188,8 @@ func Test_ComposeIsTsKey(t *testing.T) {
sep := "_ts"
ss, err := NewSuffixSnapshot((*etcdkv.EtcdKV)(nil), sep, "", snapshotPrefix)
require.Nil(t, err)
defer ss.Close()
type testcase struct {
key string
expected uint64
@ -211,6 +228,8 @@ func Test_SuffixSnaphotIsTSOfKey(t *testing.T) {
sep := "_ts"
ss, err := NewSuffixSnapshot((*etcdkv.EtcdKV)(nil), sep, "", snapshotPrefix)
require.Nil(t, err)
defer ss.Close()
type testcase struct {
key string
target string
@ -284,6 +303,7 @@ func Test_SuffixSnapshotLoad(t *testing.T) {
ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)
defer ss.Close()
for i := 0; i < 20; i++ {
vtso = typeutil.Timestamp(100 + i*5)
@ -302,10 +322,6 @@ func Test_SuffixSnapshotLoad(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, "value-19", val)
ss, err = NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)
for i := 0; i < 20; i++ {
val, err := ss.Load("key", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
@ -343,6 +359,7 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) {
ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)
defer ss.Close()
for i := 0; i < 20; i++ {
saves := map[string]string{"k1": fmt.Sprintf("v1-%d", i), "k2": fmt.Sprintf("v2-%d", i)}
@ -372,9 +389,6 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) {
assert.Equal(t, vals[0], "v1-19")
assert.Equal(t, vals[1], "v2-19")
ss, err = NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)
for i := 0; i < 20; i++ {
keys, vals, err := ss.LoadWithPrefix("k", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
@ -397,6 +411,181 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) {
ss.RemoveWithPrefix("")
}
func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
rootPath := fmt.Sprintf("/test/meta/remove-expired-test-%d", randVal)
sep := "_ts"
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
Params.EtcdCfg.Endpoints.GetAsStrings(),
Params.EtcdCfg.EtcdTLSCert.GetValue(),
Params.EtcdCfg.EtcdTLSKey.GetValue(),
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
assert.NoError(t, err)
defer etcdCli.Close()
etcdkv := etcdkv.NewEtcdKV(etcdCli, rootPath)
assert.NoError(t, err)
defer etcdkv.Close()
ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.NoError(t, err)
assert.NotNil(t, ss)
defer ss.Close()
saveFn := func(key, value string, ts typeutil.Timestamp) {
err = ss.Save(key, value, ts)
assert.NoError(t, err)
}
multiSaveFn := func(kvs map[string]string, ts typeutil.Timestamp) {
err = ss.MultiSave(kvs, ts)
assert.NoError(t, err)
}
now := time.Now()
ftso := func(ts int) typeutil.Timestamp {
return tsoutil.ComposeTS(now.Add(-1*time.Duration(ts)*time.Millisecond).UnixMilli(), 0)
}
getKey := func(prefix string, id int) string {
return fmt.Sprintf("%s-%d", prefix, id)
}
generateTestData := func(prefix string, kCnt int, kVersion int, expiredKeyCnt int) {
var value string
cnt := 0
for i := 0; i < kVersion; i++ {
kvs := make(map[string]string)
ts := ftso((i + 1) * 100)
for v := 0; v < kCnt; v++ {
if i == 0 && v%2 == 0 && cnt < expiredKeyCnt {
value = string(SuffixSnapshotTombstone)
cnt++
} else {
value = "v"
}
kvs[getKey(prefix, v)] = value
if v%25 == 0 {
multiSaveFn(kvs, ts)
kvs = make(map[string]string)
}
}
multiSaveFn(kvs, ts)
}
}
countPrefix := func(prefix string) int {
cnt := 0
err := etcdkv.WalkWithPrefix("", 10, func(key []byte, value []byte) error {
cnt++
return nil
})
assert.NoError(t, err)
return cnt
}
t.Run("Mixed test ", func(t *testing.T) {
prefix := fmt.Sprintf("prefix%d", rand.Int())
keyCnt := 500
keyVersion := 3
expiredKCnt := 100
generateTestData(prefix, keyCnt, keyVersion, expiredKCnt)
cnt := countPrefix(prefix)
assert.Equal(t, keyCnt*keyVersion+keyCnt, cnt)
err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond)
assert.NoError(t, err)
cnt = countPrefix(prefix)
assert.Equal(t, keyCnt*keyVersion+keyCnt-(expiredKCnt*keyVersion+expiredKCnt), cnt)
// clean all data
err := etcdkv.RemoveWithPrefix("")
assert.NoError(t, err)
})
t.Run("partial expired and all expired", func(t *testing.T) {
prefix := fmt.Sprintf("prefix%d", rand.Int())
value := "v"
ts := ftso(100)
saveFn(getKey(prefix, 0), value, ts)
ts = ftso(200)
saveFn(getKey(prefix, 0), value, ts)
ts = ftso(300)
saveFn(getKey(prefix, 0), value, ts)
// insert partial expired kv
ts = ftso(25)
saveFn(getKey(prefix, 1), string(SuffixSnapshotTombstone), ts)
ts = ftso(50)
saveFn(getKey(prefix, 1), value, ts)
ts = ftso(70)
saveFn(getKey(prefix, 1), value, ts)
// insert all expired kv
ts = ftso(100)
saveFn(getKey(prefix, 2), string(SuffixSnapshotTombstone), ts)
ts = ftso(200)
saveFn(getKey(prefix, 2), value, ts)
ts = ftso(300)
saveFn(getKey(prefix, 2), value, ts)
cnt := countPrefix(prefix)
assert.Equal(t, 12, cnt)
err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond)
assert.NoError(t, err)
cnt = countPrefix(prefix)
assert.Equal(t, 6, cnt)
// clean all data
err := etcdkv.RemoveWithPrefix("")
assert.NoError(t, err)
})
t.Run("parse ts fail", func(t *testing.T) {
prefix := fmt.Sprintf("prefix%d", rand.Int())
key := fmt.Sprintf("%s-%s", prefix, "ts_error-ts")
err = etcdkv.Save(ss.composeSnapshotPrefix(key), "")
assert.NoError(t, err)
err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond)
assert.NoError(t, err)
cnt := countPrefix(prefix)
assert.Equal(t, 1, cnt)
// clean all data
err := etcdkv.RemoveWithPrefix("")
assert.NoError(t, err)
})
t.Run("test walk kv data fail", func(t *testing.T) {
sep := "_ts"
rootPath := "root/"
kv := mocks.NewMetaKv(t)
kv.EXPECT().
WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).
Return(errors.New("error"))
ss, err := NewSuffixSnapshot(kv, sep, rootPath, snapshotPrefix)
assert.NotNil(t, ss)
assert.NoError(t, err)
err = ss.removeExpiredKvs(time.Now(), time.Duration(100))
assert.Error(t, err)
})
}
func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
@ -427,6 +616,7 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) {
ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)
defer ss.Close()
for i := 0; i < 20; i++ {
vtso = typeutil.Timestamp(100 + i*5)
@ -461,10 +651,6 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) {
assert.Equal(t, 39-i, len(vals))
}
ss, err = NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)
for i := 0; i < 20; i++ {
val, err := ss.Load(fmt.Sprintf("kd-%04d", i), typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
@ -492,3 +678,30 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) {
// cleanup
ss.MultiSaveAndRemoveWithPrefix(map[string]string{}, []string{""}, 0)
}
func Test_getOriginalKey(t *testing.T) {
sep := "_ts"
rootPath := "root/"
kv := mocks.NewMetaKv(t)
ss, err := NewSuffixSnapshot(kv, sep, rootPath, snapshotPrefix)
assert.NotNil(t, ss)
assert.NoError(t, err)
t.Run("match prefix fail", func(t *testing.T) {
ret, err := ss.getOriginalKey("non-snapshots/k1")
assert.Equal(t, "", ret)
assert.Error(t, err)
})
t.Run("find separator fail", func(t *testing.T) {
ret, err := ss.getOriginalKey("snapshots/k1")
assert.Equal(t, "", ret)
assert.Error(t, err)
})
t.Run("ok", func(t *testing.T) {
ret, err := ss.getOriginalKey("snapshots/prefix-1_ts438497159122780160")
assert.Equal(t, "prefix-1", ret)
assert.NoError(t, err)
})
}

View File

@ -23,16 +23,16 @@ import (
"path"
"sync"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
"go.etcd.io/etcd/api/v3/mvccpb"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
// proxyManager manages proxy connected to the rootcoord
@ -77,7 +77,7 @@ func (p *proxyManager) DelSessionFunc(fns ...func(*sessionutil.Session)) {
// WatchProxy starts a goroutine to watch proxy session changes on etcd
func (p *proxyManager) WatchProxy() error {
ctx, cancel := context.WithTimeout(p.ctx, rootcoord.RequestTimeout)
ctx, cancel := context.WithTimeout(p.ctx, etcdkv.RequestTimeout)
defer cancel()
sessions, rev, err := p.getSessionsOnEtcd(ctx)