From e4a9a473ee30bdf22efc83e7d03c827496268a72 Mon Sep 17 00:00:00 2001 From: SimFG Date: Wed, 11 Dec 2024 10:36:43 +0800 Subject: [PATCH] enhance: keep the latest snapshot key if the origin key is existed (#38333) /kind improvement Signed-off-by: SimFG --- internal/kv/etcd/etcd_kv.go | 5 ++++- .../metastore/kv/rootcoord/suffix_snapshot.go | 4 ++++ .../kv/rootcoord/suffix_snapshot_test.go | 18 +++++++++++++++++- 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index 8e1f421e5f..b6be89d890 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -89,23 +89,26 @@ func (kv *etcdKV) WalkWithPrefix(ctx context.Context, prefix string, paginationS key := prefix for { ctx, cancel := context.WithTimeout(ctx, kv.requestTimeout) - defer cancel() resp, err := kv.getEtcdMeta(ctx, key, opts...) if err != nil { + cancel() return err } for _, kv := range resp.Kvs { if err = fn(kv.Key, kv.Value); err != nil { + cancel() return err } } if !resp.More { + cancel() break } // move to next key key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0)) + cancel() } CheckElapseAndWarn(start, "Slow etcd operation(WalkWithPagination)", zap.String("prefix", prefix)) diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot.go b/internal/metastore/kv/rootcoord/suffix_snapshot.go index f6d6dd99f0..e305bfbfa9 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot.go @@ -645,6 +645,10 @@ func (ss *SuffixSnapshot) batchRemoveExpiredKvs(ctx context.Context, keyGroup [] // to protect txn finished with ascend order, reverse the latest kv with tombstone to tail of array sort.Strings(keyGroup) + if !includeOriginalKey && len(keyGroup) > 0 { + // keep the latest snapshot key for historical version compatibility + keyGroup = keyGroup[0 : len(keyGroup)-1] + } removeFn := func(partialKeys []string) error { return ss.MetaKv.MultiRemove(ctx, partialKeys) } diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go index ec33cc364d..00887efb00 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go @@ -21,6 +21,8 @@ import ( "fmt" "math/rand" "os" + "path" + "sort" "testing" "time" @@ -486,6 +488,15 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) { return cnt } + getPrefix := func(prefix string) []string { + var res []string + _ = etcdkv.WalkWithPrefix(context.TODO(), "", 10, func(key []byte, value []byte) error { + res = append(res, string(key)) + return nil + }) + return res + } + t.Run("Mixed test ", func(t *testing.T) { prefix := fmt.Sprintf("prefix%d", rand.Int()) keyCnt := 500 @@ -582,7 +593,12 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) { assert.NoError(t, err) cnt = countPrefix(prefix) - assert.Equal(t, 1, cnt) + assert.Equal(t, 2, cnt) + res := getPrefix(prefix) + sort.Strings(res) + keepKey := getKey(prefix, 0) + keepTs := ftso(100) + assert.Equal(t, []string{path.Join(rootPath, keepKey), path.Join(rootPath, ss.composeTSKey(keepKey, keepTs))}, res) // clean all data err := etcdkv.RemoveWithPrefix(context.TODO(), "")