Add based on timetravel GC for snapshot KV (#21417) (#21763)

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/21788/head
jaime 2023-01-18 10:15:44 +08:00 committed by GitHub
parent 48ea86128d
commit 608e9c4405
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 311 additions and 940 deletions

View File

@ -1,410 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"fmt"
"path"
"strconv"
"sync"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/typeutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
const (
// RequestTimeout timeout for request
RequestTimeout = 10 * time.Second
)
type rtPair struct {
rev int64
ts typeutil.Timestamp
}
type MetaSnapshot struct {
cli *clientv3.Client
root string
tsKey string
lock sync.RWMutex
ts2Rev []rtPair
minPos int
maxPos int
numTs int
}
func NewMetaSnapshot(cli *clientv3.Client, root, tsKey string, bufSize int) (*MetaSnapshot, error) {
if bufSize <= 0 {
bufSize = 1024
}
ms := &MetaSnapshot{
cli: cli,
root: root,
tsKey: tsKey,
lock: sync.RWMutex{},
ts2Rev: make([]rtPair, bufSize),
minPos: 0,
maxPos: 0,
numTs: 0,
}
if err := ms.loadTs(); err != nil {
return nil, err
}
return ms, nil
}
func (ms *MetaSnapshot) loadTs() error {
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
key := path.Join(ms.root, ms.tsKey)
resp, err := ms.cli.Get(ctx, key)
if err != nil {
return err
}
if len(resp.Kvs) <= 0 {
return nil
}
version := resp.Kvs[0].Version
revision := resp.Kvs[0].ModRevision
createRevision := resp.Kvs[0].CreateRevision
strTs := string(resp.Kvs[0].Value)
ts, err := strconv.ParseUint(strTs, 10, 64)
if err != nil {
return err
}
log.Info("Load last ts", zap.Int64("version", version), zap.Int64("revision", revision))
ms.initTs(revision, ts)
// start from revision-1, until equals to create revision
for revision--; revision >= createRevision; revision-- {
if ms.numTs == len(ms.ts2Rev) {
break
}
resp, err = ms.cli.Get(ctx, key, clientv3.WithRev(revision))
if err != nil {
return err
}
if len(resp.Kvs) <= 0 {
return nil
}
curVer := resp.Kvs[0].Version
curRev := resp.Kvs[0].ModRevision
if curVer > version {
log.Warn("version go backwards", zap.Int64("curVer", curVer), zap.Int64("version", version))
return nil
}
if curVer == version {
log.Info("Snapshot found save version with different revision", zap.Int64("revision", revision), zap.Int64("version", version))
}
strTs := string(resp.Kvs[0].Value)
if strTs == "0" {
//#issue 7150, index building inserted "0", skipping
//this is a special fix for backward compatibility, the previous version will put 0 ts into the Snapshot building index
continue
}
curTs, err := strconv.ParseUint(strTs, 10, 64)
if err != nil {
return err
}
if curTs >= ts {
return fmt.Errorf("timestamp go back, curTs=%d,ts=%d", curTs, ts)
}
ms.initTs(curRev, curTs)
ts = curTs
revision = curRev
version = curVer
}
return nil
}
func (ms *MetaSnapshot) maxTs() typeutil.Timestamp {
return ms.ts2Rev[ms.maxPos].ts
}
func (ms *MetaSnapshot) minTs() typeutil.Timestamp {
return ms.ts2Rev[ms.minPos].ts
}
func (ms *MetaSnapshot) initTs(rev int64, ts typeutil.Timestamp) {
log.Info("init meta Snapshot ts", zap.Int64("rev", rev), zap.Uint64("ts", ts))
if ms.numTs == 0 {
ms.maxPos = len(ms.ts2Rev) - 1
ms.minPos = len(ms.ts2Rev) - 1
ms.numTs = 1
ms.ts2Rev[ms.maxPos].rev = rev
ms.ts2Rev[ms.maxPos].ts = ts
} else if ms.numTs < len(ms.ts2Rev) {
ms.minPos--
ms.numTs++
ms.ts2Rev[ms.minPos].rev = rev
ms.ts2Rev[ms.minPos].ts = ts
}
}
func (ms *MetaSnapshot) putTs(rev int64, ts typeutil.Timestamp) {
log.Info("put meta snapshto ts", zap.Int64("rev", rev), zap.Uint64("ts", ts))
ms.maxPos++
if ms.maxPos == len(ms.ts2Rev) {
ms.maxPos = 0
}
ms.ts2Rev[ms.maxPos].rev = rev
ms.ts2Rev[ms.maxPos].ts = ts
if ms.numTs < len(ms.ts2Rev) {
ms.numTs++
} else {
ms.minPos++
if ms.minPos == len(ms.ts2Rev) {
ms.minPos = 0
}
}
}
func (ms *MetaSnapshot) searchOnCache(ts typeutil.Timestamp, start, length int) int64 {
if length == 1 {
return ms.ts2Rev[start].rev
}
begin := start
end := begin + length
mid := (begin + end) / 2
for {
if ms.ts2Rev[mid].ts == ts {
return ms.ts2Rev[mid].rev
}
if mid == begin {
if ms.ts2Rev[mid].ts < ts || mid == start {
return ms.ts2Rev[mid].rev
}
return ms.ts2Rev[mid-1].rev
}
if ms.ts2Rev[mid].ts > ts {
end = mid
} else if ms.ts2Rev[mid].ts < ts {
begin = mid + 1
}
mid = (begin + end) / 2
}
}
func (ms *MetaSnapshot) getRevOnCache(ts typeutil.Timestamp) int64 {
if ms.numTs == 0 {
return 0
}
if ts >= ms.ts2Rev[ms.maxPos].ts {
return ms.ts2Rev[ms.maxPos].rev
}
if ts < ms.ts2Rev[ms.minPos].ts {
return 0
}
if ms.maxPos > ms.minPos {
return ms.searchOnCache(ts, ms.minPos, ms.maxPos-ms.minPos+1)
}
topVal := ms.ts2Rev[len(ms.ts2Rev)-1]
botVal := ms.ts2Rev[0]
minVal := ms.ts2Rev[ms.minPos]
maxVal := ms.ts2Rev[ms.maxPos]
if ts >= topVal.ts && ts < botVal.ts {
return topVal.rev
} else if ts >= minVal.ts && ts < topVal.ts {
return ms.searchOnCache(ts, ms.minPos, len(ms.ts2Rev)-ms.minPos)
} else if ts >= botVal.ts && ts < maxVal.ts {
return ms.searchOnCache(ts, 0, ms.maxPos+1)
}
return 0
}
func (ms *MetaSnapshot) getRevOnEtcd(ts typeutil.Timestamp, rev int64) int64 {
if rev < 2 {
return 0
}
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
for rev--; rev >= 2; rev-- {
resp, err := ms.cli.Get(ctx, path.Join(ms.root, ms.tsKey), clientv3.WithRev(rev))
if err != nil {
log.Info("get ts from etcd failed", zap.Error(err))
return 0
}
if len(resp.Kvs) <= 0 {
return 0
}
rev = resp.Kvs[0].ModRevision
curTs, err := strconv.ParseUint(string(resp.Kvs[0].Value), 10, 64)
if err != nil {
log.Info("parse timestam error", zap.String("input", string(resp.Kvs[0].Value)), zap.Error(err))
return 0
}
if curTs <= ts {
return rev
}
}
return 0
}
func (ms *MetaSnapshot) getRev(ts typeutil.Timestamp) (int64, error) {
rev := ms.getRevOnCache(ts)
if rev > 0 {
return rev, nil
}
rev = ms.ts2Rev[ms.minPos].rev
rev = ms.getRevOnEtcd(ts, rev)
if rev > 0 {
return rev, nil
}
return 0, fmt.Errorf("can't find revision on ts=%d", ts)
}
func (ms *MetaSnapshot) Save(key, value string, ts typeutil.Timestamp) error {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
strTs := strconv.FormatInt(int64(ts), 10)
resp, err := ms.cli.Txn(ctx).If().Then(
clientv3.OpPut(path.Join(ms.root, key), value),
clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs),
).Commit()
if err != nil {
return err
}
ms.putTs(resp.Header.Revision, ts)
return nil
}
func (ms *MetaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) {
ms.lock.RLock()
defer ms.lock.RUnlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
var resp *clientv3.GetResponse
var err error
var rev int64
if ts == 0 {
resp, err = ms.cli.Get(ctx, path.Join(ms.root, key))
if err != nil {
return "", err
}
} else {
rev, err = ms.getRev(ts)
if err != nil {
return "", err
}
resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithRev(rev))
if err != nil {
return "", err
}
}
if len(resp.Kvs) == 0 {
return "", fmt.Errorf("there is no value on key = %s, ts = %d", key, ts)
}
return string(resp.Kvs[0].Value), nil
}
func (ms *MetaSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
ops := make([]clientv3.Op, 0, len(kvs)+1)
for key, value := range kvs {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value))
}
strTs := strconv.FormatInt(int64(ts), 10)
ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs))
resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit()
if err != nil {
return err
}
ms.putTs(resp.Header.Revision, ts)
return nil
}
func (ms *MetaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) {
ms.lock.RLock()
defer ms.lock.RUnlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
var resp *clientv3.GetResponse
var err error
var rev int64
if ts == 0 {
resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, err
}
} else {
rev, err = ms.getRev(ts)
if err != nil {
return nil, nil, err
}
resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithPrefix(), clientv3.WithRev(rev), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, err
}
}
keys := make([]string, 0, len(resp.Kvs))
values := make([]string, 0, len(resp.Kvs))
tk := path.Join(ms.root, "k")
prefixLen := len(tk) - 1
for _, kv := range resp.Kvs {
tk = string(kv.Key)
tk = tk[prefixLen:]
keys = append(keys, tk)
values = append(values, string(kv.Value))
}
return keys, values, nil
}
func (ms *MetaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
ms.lock.Lock()
defer ms.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
ops := make([]clientv3.Op, 0, len(saves)+len(removals)+1)
for key, value := range saves {
ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value))
}
strTs := strconv.FormatInt(int64(ts), 10)
for _, key := range removals {
ops = append(ops, clientv3.OpDelete(path.Join(ms.root, key), clientv3.WithPrefix()))
}
ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs))
resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit()
if err != nil {
return err
}
ms.putTs(resp.Header.Revision, ts)
return nil
}

View File

@ -1,519 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"fmt"
"math/rand"
"os"
"path"
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
)
var Params paramtable.ComponentParam
func TestMain(m *testing.M) {
Params.Init()
code := m.Run()
os.Exit(code)
}
func TestMetaSnapshot(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
var vtso typeutil.Timestamp
ftso := func() typeutil.Timestamp {
return vtso
}
ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 4)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 8; i++ {
vtso = typeutil.Timestamp(100 + i)
ts := ftso()
err = ms.Save("abc", fmt.Sprintf("value-%d", i), ts)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
_, err = etcdCli.Put(context.Background(), "other", fmt.Sprintf("other-%d", i))
assert.Nil(t, err)
}
ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 4)
assert.Nil(t, err)
assert.NotNil(t, ms)
}
func TestSearchOnCache(t *testing.T) {
ms := &MetaSnapshot{}
for i := 0; i < 8; i++ {
ms.ts2Rev = append(ms.ts2Rev,
rtPair{
rev: int64(i * 2),
ts: typeutil.Timestamp(i * 2),
})
}
rev := ms.searchOnCache(9, 0, 8)
assert.Equal(t, int64(8), rev)
rev = ms.searchOnCache(1, 0, 2)
assert.Equal(t, int64(0), rev)
rev = ms.searchOnCache(1, 0, 8)
assert.Equal(t, int64(0), rev)
rev = ms.searchOnCache(14, 0, 8)
assert.Equal(t, int64(14), rev)
rev = ms.searchOnCache(0, 0, 8)
assert.Equal(t, int64(0), rev)
}
func TestGetRevOnCache(t *testing.T) {
ms := &MetaSnapshot{}
ms.ts2Rev = make([]rtPair, 7)
ms.initTs(7, 16)
ms.initTs(6, 14)
ms.initTs(5, 12)
ms.initTs(4, 10)
var rev int64
rev = ms.getRevOnCache(17)
assert.Equal(t, int64(7), rev)
rev = ms.getRevOnCache(9)
assert.Equal(t, int64(0), rev)
rev = ms.getRevOnCache(10)
assert.Equal(t, int64(4), rev)
rev = ms.getRevOnCache(16)
assert.Equal(t, int64(7), rev)
rev = ms.getRevOnCache(15)
assert.Equal(t, int64(6), rev)
rev = ms.getRevOnCache(12)
assert.Equal(t, int64(5), rev)
ms.initTs(3, 8)
ms.initTs(2, 6)
assert.Equal(t, ms.maxPos, 6)
assert.Equal(t, ms.minPos, 1)
rev = ms.getRevOnCache(17)
assert.Equal(t, int64(7), rev)
rev = ms.getRevOnCache(9)
assert.Equal(t, int64(3), rev)
rev = ms.getRevOnCache(10)
assert.Equal(t, int64(4), rev)
rev = ms.getRevOnCache(16)
assert.Equal(t, int64(7), rev)
rev = ms.getRevOnCache(15)
assert.Equal(t, int64(6), rev)
rev = ms.getRevOnCache(12)
assert.Equal(t, int64(5), rev)
rev = ms.getRevOnCache(5)
assert.Equal(t, int64(0), rev)
ms.putTs(8, 18)
assert.Equal(t, ms.maxPos, 0)
assert.Equal(t, ms.minPos, 1)
for rev = 2; rev <= 7; rev++ {
ts := ms.getRevOnCache(typeutil.Timestamp(rev*2 + 3))
assert.Equal(t, rev, ts)
}
ms.putTs(9, 20)
assert.Equal(t, ms.maxPos, 1)
assert.Equal(t, ms.minPos, 2)
assert.Equal(t, ms.numTs, 7)
curMax := ms.maxPos
curMin := ms.minPos
for i := 10; i < 20; i++ {
ms.putTs(int64(i), typeutil.Timestamp(i*2+2))
curMax++
curMin++
if curMax == len(ms.ts2Rev) {
curMax = 0
}
if curMin == len(ms.ts2Rev) {
curMin = 0
}
assert.Equal(t, curMax, ms.maxPos)
assert.Equal(t, curMin, ms.minPos)
}
for i := 13; i < 20; i++ {
rev = ms.getRevOnCache(typeutil.Timestamp(i*2 + 2))
assert.Equal(t, int64(i), rev)
rev = ms.getRevOnCache(typeutil.Timestamp(i*2 + 3))
assert.Equal(t, int64(i), rev)
}
rev = ms.getRevOnCache(27)
assert.Zero(t, rev)
}
func TestGetRevOnEtcd(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
key := path.Join(rootPath, tsKey)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
ms := MetaSnapshot{
cli: etcdCli,
root: rootPath,
tsKey: tsKey,
}
resp, err := etcdCli.Put(ctx, key, "100")
assert.Nil(t, err)
revList := []int64{}
tsList := []typeutil.Timestamp{}
revList = append(revList, resp.Header.Revision)
tsList = append(tsList, 100)
for i := 110; i < 200; i += 10 {
resp, err = etcdCli.Put(ctx, key, fmt.Sprintf("%d", i))
assert.Nil(t, err)
revList = append(revList, resp.Header.Revision)
tsList = append(tsList, typeutil.Timestamp(i))
}
lastRev := revList[len(revList)-1] + 1
for i, ts := range tsList {
rev := ms.getRevOnEtcd(ts, lastRev)
assert.Equal(t, revList[i], rev)
}
for i := 0; i < len(tsList); i++ {
rev := ms.getRevOnEtcd(tsList[i]+5, lastRev)
assert.Equal(t, revList[i], rev)
}
rev := ms.getRevOnEtcd(200, lastRev)
assert.Equal(t, lastRev-1, rev)
rev = ms.getRevOnEtcd(99, lastRev)
assert.Zero(t, rev)
}
func TestLoad(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
var vtso typeutil.Timestamp
ftso := func() typeutil.Timestamp {
return vtso
}
ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 7)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
vtso = typeutil.Timestamp(100 + i*5)
ts := ftso()
err = ms.Save("key", fmt.Sprintf("value-%d", i), ts)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
for i := 0; i < 20; i++ {
val, err := ms.Load("key", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, val, fmt.Sprintf("value-%d", i))
}
val, err := ms.Load("key", 0)
assert.Nil(t, err)
assert.Equal(t, "value-19", val)
ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 11)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
val, err := ms.Load("key", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, val, fmt.Sprintf("value-%d", i))
}
}
func TestMultiSave(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
var vtso typeutil.Timestamp
ftso := func() typeutil.Timestamp {
return vtso
}
ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 7)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
saves := map[string]string{"k1": fmt.Sprintf("v1-%d", i), "k2": fmt.Sprintf("v2-%d", i)}
vtso = typeutil.Timestamp(100 + i*5)
ts := ftso()
err = ms.MultiSave(saves, ts)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
for i := 0; i < 20; i++ {
keys, vals, err := ms.LoadWithPrefix("k", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, len(keys), len(vals))
assert.Equal(t, len(keys), 2)
assert.Equal(t, keys[0], "k1")
assert.Equal(t, keys[1], "k2")
assert.Equal(t, vals[0], fmt.Sprintf("v1-%d", i))
assert.Equal(t, vals[1], fmt.Sprintf("v2-%d", i))
}
keys, vals, err := ms.LoadWithPrefix("k", 0)
assert.Nil(t, err)
assert.Equal(t, len(keys), len(vals))
assert.Equal(t, len(keys), 2)
assert.Equal(t, keys[0], "k1")
assert.Equal(t, keys[1], "k2")
assert.Equal(t, vals[0], "v1-19")
assert.Equal(t, vals[1], "v2-19")
ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 11)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
keys, vals, err := ms.LoadWithPrefix("k", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, len(keys), len(vals))
assert.Equal(t, len(keys), 2)
assert.Equal(t, keys[0], "k1")
assert.Equal(t, keys[1], "k2")
assert.Equal(t, vals[0], fmt.Sprintf("v1-%d", i))
assert.Equal(t, vals[1], fmt.Sprintf("v2-%d", i))
}
}
func TestMultiSaveAndRemoveWithPrefix(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
var vtso typeutil.Timestamp
ftso := func() typeutil.Timestamp {
return vtso
}
defer etcdCli.Close()
ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 7)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
vtso = typeutil.Timestamp(100 + i*5)
ts := ftso()
err = ms.Save(fmt.Sprintf("kd-%04d", i), fmt.Sprintf("value-%d", i), ts)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
for i := 20; i < 40; i++ {
sm := map[string]string{"ks": fmt.Sprintf("value-%d", i)}
dm := []string{fmt.Sprintf("kd-%04d", i-20)}
vtso = typeutil.Timestamp(100 + i*5)
ts := ftso()
err = ms.MultiSaveAndRemoveWithPrefix(sm, dm, ts)
assert.Nil(t, err)
assert.Equal(t, vtso, ts)
}
for i := 0; i < 20; i++ {
val, err := ms.Load(fmt.Sprintf("kd-%04d", i), typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("value-%d", i), val)
_, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, i+1, len(vals))
}
for i := 20; i < 40; i++ {
val, err := ms.Load("ks", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("value-%d", i), val)
_, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, 39-i, len(vals))
}
ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 11)
assert.Nil(t, err)
assert.NotNil(t, ms)
for i := 0; i < 20; i++ {
val, err := ms.Load(fmt.Sprintf("kd-%04d", i), typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("value-%d", i), val)
_, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, i+1, len(vals))
}
for i := 20; i < 40; i++ {
val, err := ms.Load("ks", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("value-%d", i), val)
_, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
assert.Equal(t, 39-i, len(vals))
}
}
func TestTsBackward(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
kv, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024)
assert.Nil(t, err)
err = kv.loadTs()
assert.Nil(t, err)
kv.Save("a", "b", 100)
kv.Save("a", "c", 99) // backward
kv.Save("a", "d", 200)
kv, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024)
assert.Error(t, err)
}
func TestFix7150(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
kv, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024)
assert.Nil(t, err)
err = kv.loadTs()
assert.Nil(t, err)
kv.Save("a", "b", 100)
kv.Save("a", "c", 0) // bug introduced
kv.Save("a", "d", 200)
kv, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024)
assert.Nil(t, err)
err = kv.loadTs()
assert.Nil(t, err)
}

View File

@ -26,13 +26,17 @@ import (
"strconv"
"strings"
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -80,6 +84,8 @@ type SuffixSnapshot struct {
// exp is the shortcut format checker for ts-key
// composed with separator only
exp *regexp.Regexp
closeGC chan struct{}
}
// tsv struct stores kv with timestamp
@ -114,7 +120,10 @@ func NewSuffixSnapshot(metaKV kv.MetaKv, sep, root, snapshot string) (*SuffixSna
snapshotLen: snapshotLen,
rootPrefix: root,
rootLen: rootLen,
closeGC: make(chan struct{}, 1),
}
go ss.startBackgroundGC()
return ss, nil
}
@ -540,6 +549,35 @@ func (ss *SuffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string,
return err
}
func (ss *SuffixSnapshot) Close() {
close(ss.closeGC)
}
// startBackgroundGC the data will clean up if key ts!=0 and expired
func (ss *SuffixSnapshot) startBackgroundGC() {
log.Debug("suffix snapshot GC goroutine start!")
var params paramtable.ComponentParam
params.Init()
ticker := time.NewTicker(60 * time.Minute)
defer ticker.Stop()
retentionDuration := time.Duration(params.CommonCfg.RetentionDuration) * time.Second
for {
select {
case <-ss.closeGC:
log.Warn("quit suffix snapshot GC goroutine!")
return
case now := <-ticker.C:
err := ss.removeExpiredKvs(now, retentionDuration)
if err != nil {
log.Warn("remove expired data fail during GC", zap.Error(err))
}
}
}
}
func (ss *SuffixSnapshot) getOriginalKey(snapshotKey string) (string, error) {
if !strings.HasPrefix(snapshotKey, ss.snapshotPrefix) {
return "", fmt.Errorf("get original key failed, invailed snapshot key:%s", snapshotKey)
@ -552,3 +590,83 @@ func (ss *SuffixSnapshot) getOriginalKey(snapshotKey string) (string, error) {
prefix := snapshotKey[:idx]
return prefix[ss.snapshotLen:], nil
}
func (ss *SuffixSnapshot) batchRemoveExpiredKvs(keyGroup []string, originalKey string, includeOriginalKey bool) error {
if includeOriginalKey {
keyGroup = append(keyGroup, originalKey)
}
// to protect txn finished with ascend order, reverse the latest kv with tombstone to tail of array
sort.Strings(keyGroup)
removeFn := func(partialKeys []string) error {
return ss.MetaKv.MultiRemove(keyGroup)
}
return etcd.RemoveByBatch(keyGroup, removeFn)
}
func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time, retentionDuration time.Duration) error {
keyGroup := make([]string, 0)
latestOriginalKey := ""
latestValue := ""
groupCnt := 0
removeFn := func(curOriginalKey string) error {
if !ss.isTombstone(latestValue) {
return nil
}
return ss.batchRemoveExpiredKvs(keyGroup, curOriginalKey, groupCnt == len(keyGroup))
}
// walk all kvs with SortAsc, we need walk to the latest key for each key group to check the kv
// whether contains tombstone, then if so, it represents the original key has been removed.
// TODO: walk with Desc
err := ss.MetaKv.WalkWithPrefix(ss.snapshotPrefix, PaginationSize, func(k []byte, v []byte) error {
key := string(k)
value := string(v)
key = ss.hideRootPrefix(key)
ts, ok := ss.isTSKey(key)
// it is original key if the key doesn't contain ts
if !ok {
log.Warn("skip key because it doesn't contain ts", zap.String("key", key))
return nil
}
curOriginalKey, err := ss.getOriginalKey(key)
if err != nil {
return err
}
// reset if starting look up a new key group
if latestOriginalKey != "" && latestOriginalKey != curOriginalKey {
// it indicates all keys need to remove that the prefix is original key
// it means the latest original kvs has already been removed if the latest kv has tombstone marker.
if err := removeFn(latestOriginalKey); err != nil {
return err
}
keyGroup = make([]string, 0)
groupCnt = 0
}
latestValue = value
groupCnt++
latestOriginalKey = curOriginalKey
// record keys if the kv is expired
pts, _ := tsoutil.ParseTS(ts)
expireTime := pts.Add(retentionDuration)
// break loop if it reaches expire time
if expireTime.Before(now) {
keyGroup = append(keyGroup, key)
}
return nil
})
if err != nil {
return err
}
return removeFn(latestOriginalKey)
}

View File

@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"math/rand"
"os"
"testing"
"time"
@ -30,6 +31,8 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -37,6 +40,14 @@ var (
snapshotPrefix = "snapshots"
)
var Params paramtable.ComponentParam
func TestMain(m *testing.M) {
Params.Init()
code := m.Run()
os.Exit(code)
}
func Test_binarySearchRecords(t *testing.T) {
type testcase struct {
records []tsv
@ -177,6 +188,8 @@ func Test_ComposeIsTsKey(t *testing.T) {
sep := "_ts"
ss, err := NewSuffixSnapshot((*etcdkv.EtcdKV)(nil), sep, "", snapshotPrefix)
require.Nil(t, err)
defer ss.Close()
type testcase struct {
key string
expected uint64
@ -215,6 +228,8 @@ func Test_SuffixSnaphotIsTSOfKey(t *testing.T) {
sep := "_ts"
ss, err := NewSuffixSnapshot((*etcdkv.EtcdKV)(nil), sep, "", snapshotPrefix)
require.Nil(t, err)
defer ss.Close()
type testcase struct {
key string
target string
@ -288,6 +303,7 @@ func Test_SuffixSnapshotLoad(t *testing.T) {
ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)
defer ss.Close()
for i := 0; i < 20; i++ {
vtso = typeutil.Timestamp(100 + i*5)
@ -306,10 +322,6 @@ func Test_SuffixSnapshotLoad(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, "value-19", val)
ss, err = NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)
for i := 0; i < 20; i++ {
val, err := ss.Load("key", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
@ -347,6 +359,7 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) {
ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)
defer ss.Close()
for i := 0; i < 20; i++ {
saves := map[string]string{"k1": fmt.Sprintf("v1-%d", i), "k2": fmt.Sprintf("v2-%d", i)}
@ -376,9 +389,6 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) {
assert.Equal(t, vals[0], "v1-19")
assert.Equal(t, vals[1], "v2-19")
ss, err = NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)
for i := 0; i < 20; i++ {
keys, vals, err := ss.LoadWithPrefix("k", typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)
@ -401,6 +411,181 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) {
ss.RemoveWithPrefix("")
}
func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
rootPath := fmt.Sprintf("/test/meta/remove-expired-test-%d", randVal)
sep := "_ts"
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.NoError(t, err)
defer etcdCli.Close()
etcdkv := etcdkv.NewEtcdKV(etcdCli, rootPath)
assert.NoError(t, err)
defer etcdkv.Close()
ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.NoError(t, err)
assert.NotNil(t, ss)
defer ss.Close()
saveFn := func(key, value string, ts typeutil.Timestamp) {
err = ss.Save(key, value, ts)
assert.NoError(t, err)
}
multiSaveFn := func(kvs map[string]string, ts typeutil.Timestamp) {
err = ss.MultiSave(kvs, ts)
assert.NoError(t, err)
}
now := time.Now()
ftso := func(ts int) typeutil.Timestamp {
return tsoutil.ComposeTS(now.Add(-1*time.Duration(ts)*time.Millisecond).UnixMilli(), 0)
}
getKey := func(prefix string, id int) string {
return fmt.Sprintf("%s-%d", prefix, id)
}
generateTestData := func(prefix string, kCnt int, kVersion int, expiredKeyCnt int) {
var value string
cnt := 0
for i := 0; i < kVersion; i++ {
kvs := make(map[string]string)
ts := ftso((i + 1) * 100)
for v := 0; v < kCnt; v++ {
if i == 0 && v%2 == 0 && cnt < expiredKeyCnt {
value = string(SuffixSnapshotTombstone)
cnt++
} else {
value = "v"
}
kvs[getKey(prefix, v)] = value
if v%25 == 0 {
multiSaveFn(kvs, ts)
kvs = make(map[string]string)
}
}
multiSaveFn(kvs, ts)
}
}
countPrefix := func(prefix string) int {
cnt := 0
err := etcdkv.WalkWithPrefix("", 10, func(key []byte, value []byte) error {
cnt++
return nil
})
assert.NoError(t, err)
return cnt
}
t.Run("Mixed test ", func(t *testing.T) {
prefix := fmt.Sprintf("prefix%d", rand.Int())
keyCnt := 500
keyVersion := 3
expiredKCnt := 100
generateTestData(prefix, keyCnt, keyVersion, expiredKCnt)
cnt := countPrefix(prefix)
assert.Equal(t, keyCnt*keyVersion+keyCnt, cnt)
err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond)
assert.NoError(t, err)
cnt = countPrefix(prefix)
assert.Equal(t, keyCnt*keyVersion+keyCnt-(expiredKCnt*keyVersion+expiredKCnt), cnt)
// clean all data
err := etcdkv.RemoveWithPrefix("")
assert.NoError(t, err)
})
t.Run("partial expired and all expired", func(t *testing.T) {
prefix := fmt.Sprintf("prefix%d", rand.Int())
value := "v"
ts := ftso(100)
saveFn(getKey(prefix, 0), value, ts)
ts = ftso(200)
saveFn(getKey(prefix, 0), value, ts)
ts = ftso(300)
saveFn(getKey(prefix, 0), value, ts)
// insert partial expired kv
ts = ftso(25)
saveFn(getKey(prefix, 1), string(SuffixSnapshotTombstone), ts)
ts = ftso(50)
saveFn(getKey(prefix, 1), value, ts)
ts = ftso(70)
saveFn(getKey(prefix, 1), value, ts)
// insert all expired kv
ts = ftso(100)
saveFn(getKey(prefix, 2), string(SuffixSnapshotTombstone), ts)
ts = ftso(200)
saveFn(getKey(prefix, 2), value, ts)
ts = ftso(300)
saveFn(getKey(prefix, 2), value, ts)
cnt := countPrefix(prefix)
assert.Equal(t, 12, cnt)
err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond)
assert.NoError(t, err)
cnt = countPrefix(prefix)
assert.Equal(t, 6, cnt)
// clean all data
err := etcdkv.RemoveWithPrefix("")
assert.NoError(t, err)
})
t.Run("parse ts fail", func(t *testing.T) {
prefix := fmt.Sprintf("prefix%d", rand.Int())
key := fmt.Sprintf("%s-%s", prefix, "ts_error-ts")
err = etcdkv.Save(ss.composeSnapshotPrefix(key), "")
assert.NoError(t, err)
err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond)
assert.NoError(t, err)
cnt := countPrefix(prefix)
assert.Equal(t, 1, cnt)
// clean all data
err := etcdkv.RemoveWithPrefix("")
assert.NoError(t, err)
})
t.Run("test walk kv data fail", func(t *testing.T) {
sep := "_ts"
rootPath := "root/"
kv := mocks.NewMetaKv(t)
kv.EXPECT().
WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything).
Return(errors.New("error"))
ss, err := NewSuffixSnapshot(kv, sep, rootPath, snapshotPrefix)
assert.NotNil(t, ss)
assert.NoError(t, err)
err = ss.removeExpiredKvs(time.Now(), time.Duration(100))
assert.Error(t, err)
})
}
func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
@ -431,6 +616,7 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) {
ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)
defer ss.Close()
for i := 0; i < 20; i++ {
vtso = typeutil.Timestamp(100 + i*5)
@ -465,10 +651,6 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) {
assert.Equal(t, 39-i, len(vals))
}
ss, err = NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
assert.Nil(t, err)
assert.NotNil(t, ss)
for i := 0; i < 20; i++ {
val, err := ss.Load(fmt.Sprintf("kd-%04d", i), typeutil.Timestamp(100+i*5+2))
assert.Nil(t, err)