mirror of https://github.com/milvus-io/milvus.git
enhance: keep the latest snapshot key if the origin key is existed (#38333)
/kind improvement Signed-off-by: SimFG <bang.fu@zilliz.com>pull/38362/head
parent
9ef76971ce
commit
e4a9a473ee
|
@ -89,23 +89,26 @@ func (kv *etcdKV) WalkWithPrefix(ctx context.Context, prefix string, paginationS
|
||||||
key := prefix
|
key := prefix
|
||||||
for {
|
for {
|
||||||
ctx, cancel := context.WithTimeout(ctx, kv.requestTimeout)
|
ctx, cancel := context.WithTimeout(ctx, kv.requestTimeout)
|
||||||
defer cancel()
|
|
||||||
resp, err := kv.getEtcdMeta(ctx, key, opts...)
|
resp, err := kv.getEtcdMeta(ctx, key, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cancel()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, kv := range resp.Kvs {
|
for _, kv := range resp.Kvs {
|
||||||
if err = fn(kv.Key, kv.Value); err != nil {
|
if err = fn(kv.Key, kv.Value); err != nil {
|
||||||
|
cancel()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !resp.More {
|
if !resp.More {
|
||||||
|
cancel()
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// move to next key
|
// move to next key
|
||||||
key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
|
key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
|
||||||
|
cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
CheckElapseAndWarn(start, "Slow etcd operation(WalkWithPagination)", zap.String("prefix", prefix))
|
CheckElapseAndWarn(start, "Slow etcd operation(WalkWithPagination)", zap.String("prefix", prefix))
|
||||||
|
|
|
@ -645,6 +645,10 @@ func (ss *SuffixSnapshot) batchRemoveExpiredKvs(ctx context.Context, keyGroup []
|
||||||
|
|
||||||
// to protect txn finished with ascend order, reverse the latest kv with tombstone to tail of array
|
// to protect txn finished with ascend order, reverse the latest kv with tombstone to tail of array
|
||||||
sort.Strings(keyGroup)
|
sort.Strings(keyGroup)
|
||||||
|
if !includeOriginalKey && len(keyGroup) > 0 {
|
||||||
|
// keep the latest snapshot key for historical version compatibility
|
||||||
|
keyGroup = keyGroup[0 : len(keyGroup)-1]
|
||||||
|
}
|
||||||
removeFn := func(partialKeys []string) error {
|
removeFn := func(partialKeys []string) error {
|
||||||
return ss.MetaKv.MultiRemove(ctx, partialKeys)
|
return ss.MetaKv.MultiRemove(ctx, partialKeys)
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
|
"path"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -486,6 +488,15 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) {
|
||||||
return cnt
|
return cnt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getPrefix := func(prefix string) []string {
|
||||||
|
var res []string
|
||||||
|
_ = etcdkv.WalkWithPrefix(context.TODO(), "", 10, func(key []byte, value []byte) error {
|
||||||
|
res = append(res, string(key))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
t.Run("Mixed test ", func(t *testing.T) {
|
t.Run("Mixed test ", func(t *testing.T) {
|
||||||
prefix := fmt.Sprintf("prefix%d", rand.Int())
|
prefix := fmt.Sprintf("prefix%d", rand.Int())
|
||||||
keyCnt := 500
|
keyCnt := 500
|
||||||
|
@ -582,7 +593,12 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
cnt = countPrefix(prefix)
|
cnt = countPrefix(prefix)
|
||||||
assert.Equal(t, 1, cnt)
|
assert.Equal(t, 2, cnt)
|
||||||
|
res := getPrefix(prefix)
|
||||||
|
sort.Strings(res)
|
||||||
|
keepKey := getKey(prefix, 0)
|
||||||
|
keepTs := ftso(100)
|
||||||
|
assert.Equal(t, []string{path.Join(rootPath, keepKey), path.Join(rootPath, ss.composeTSKey(keepKey, keepTs))}, res)
|
||||||
|
|
||||||
// clean all data
|
// clean all data
|
||||||
err := etcdkv.RemoveWithPrefix(context.TODO(), "")
|
err := etcdkv.RemoveWithPrefix(context.TODO(), "")
|
||||||
|
|
Loading…
Reference in New Issue