From f85af0732cbd58a6336877d4314b6f148627e8df Mon Sep 17 00:00:00 2001 From: yiwangdr <80064917+yiwangdr@users.noreply.github.com> Date: Tue, 12 Sep 2023 00:07:17 -0700 Subject: [PATCH] remove MultiRemoveWithPrefix (#26924) Signed-off-by: yiwangdr --- internal/kv/etcd/embed_etcd_kv.go | 13 ----- internal/kv/etcd/embed_etcd_kv_test.go | 77 +------------------------- internal/kv/etcd/etcd_kv.go | 19 ------- internal/kv/etcd/etcd_kv_test.go | 45 +-------------- internal/kv/kv.go | 1 - internal/kv/mem/mem_kv.go | 6 -- internal/kv/rocksdb/rocksdb_kv.go | 12 ---- internal/kv/rocksdb/rocksdb_kv_test.go | 26 +-------- internal/kv/tikv/txn_tikv.go | 58 ------------------- internal/kv/tikv/txn_tikv_test.go | 49 +--------------- 10 files changed, 9 insertions(+), 297 deletions(-) diff --git a/internal/kv/etcd/embed_etcd_kv.go b/internal/kv/etcd/embed_etcd_kv.go index 91cf404397..5cfec20c61 100644 --- a/internal/kv/etcd/embed_etcd_kv.go +++ b/internal/kv/etcd/embed_etcd_kv.go @@ -475,19 +475,6 @@ func (kv *EmbedEtcdKV) WatchWithRevision(key string, revision int64) clientv3.Wa return rch } -func (kv *EmbedEtcdKV) MultiRemoveWithPrefix(keys []string) error { - ops := make([]clientv3.Op, 0, len(keys)) - for _, k := range keys { - op := clientv3.OpDelete(path.Join(kv.rootPath, k), clientv3.WithPrefix()) - ops = append(ops, op) - } - ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) - defer cancel() - - _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() - return err -} - // MultiSaveAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals. func (kv *EmbedEtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { ops := make([]clientv3.Op, 0, len(saves)+len(removals)) diff --git a/internal/kv/etcd/embed_etcd_kv_test.go b/internal/kv/etcd/embed_etcd_kv_test.go index 314762f03f..03205a992c 100644 --- a/internal/kv/etcd/embed_etcd_kv_test.go +++ b/internal/kv/etcd/embed_etcd_kv_test.go @@ -522,7 +522,7 @@ func TestEmbedEtcd(te *testing.T) { assert.Empty(t, vs) }) - te.Run("etcdKV MultiRemoveWithPrefix", func(t *testing.T) { + te.Run("etcdKV MultiSaveAndRemoveWithPrefix", func(t *testing.T) { rootPath := "/etcd/test/root/multi_remove_with_prefix" metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) require.NoError(t, err) @@ -539,45 +539,6 @@ func TestEmbedEtcd(te *testing.T) { "x/den/2": "200", } - err = metaKv.MultiSave(prepareTests) - require.NoError(t, err) - - multiRemoveWithPrefixTests := []struct { - prefix []string - - testKey string - expectedValue string - }{ - {[]string{"x/abc"}, "x/abc/1", ""}, - {[]string{}, "x/abc/2", ""}, - {[]string{}, "x/def/1", "10"}, - {[]string{}, "x/def/2", "20"}, - {[]string{}, "x/den/1", "100"}, - {[]string{}, "x/den/2", "200"}, - {[]string{}, "not-exist", ""}, - {[]string{"x/def", "x/den"}, "x/def/1", ""}, - {[]string{}, "x/def/1", ""}, - {[]string{}, "x/def/2", ""}, - {[]string{}, "x/den/1", ""}, - {[]string{}, "x/den/2", ""}, - {[]string{}, "not-exist", ""}, - } - - for _, test := range multiRemoveWithPrefixTests { - if len(test.prefix) > 0 { - err = metaKv.MultiRemoveWithPrefix(test.prefix) - assert.NoError(t, err) - } - - v, _ := metaKv.Load(test.testKey) - assert.Equal(t, test.expectedValue, v) - } - - k, v, err := metaKv.LoadWithPrefix("/") - assert.NoError(t, err) - assert.Zero(t, len(k)) - assert.Zero(t, len(v)) - // MultiSaveAndRemoveWithPrefix err = metaKv.MultiSave(prepareTests) require.NoError(t, err) @@ -597,7 +558,7 @@ func TestEmbedEtcd(te *testing.T) { } for _, test := range multiSaveAndRemoveWithPrefixTests { - k, _, err = metaKv.LoadWithPrefix(test.loadPrefix) + k, _, err := metaKv.LoadWithPrefix(test.loadPrefix) assert.NoError(t, err) assert.Equal(t, test.lengthBeforeRemove, len(k)) @@ -628,40 +589,6 @@ func TestEmbedEtcd(te *testing.T) { "x/den/2": []byte("200"), } - err = metaKv.MultiSaveBytes(prepareTests) - require.NoError(t, err) - - multiRemoveWithPrefixTests := []struct { - prefix []string - - testKey string - expectedValue []byte - }{ - {[]string{"x/abc"}, "x/abc/1", nil}, - {[]string{}, "x/abc/2", nil}, - {[]string{}, "x/def/1", []byte("10")}, - {[]string{}, "x/def/2", []byte("20")}, - {[]string{}, "x/den/1", []byte("100")}, - {[]string{}, "x/den/2", []byte("200")}, - {[]string{}, "not-exist", nil}, - {[]string{"x/def", "x/den"}, "x/def/1", nil}, - {[]string{}, "x/def/1", nil}, - {[]string{}, "x/def/2", nil}, - {[]string{}, "x/den/1", nil}, - {[]string{}, "x/den/2", nil}, - {[]string{}, "not-exist", nil}, - } - - for _, test := range multiRemoveWithPrefixTests { - if len(test.prefix) > 0 { - err = metaKv.MultiRemoveWithPrefix(test.prefix) - assert.NoError(t, err) - } - - v, _ := metaKv.LoadBytes(test.testKey) - assert.Equal(t, test.expectedValue, v) - } - k, v, err := metaKv.LoadBytesWithPrefix("/") assert.NoError(t, err) assert.Zero(t, len(k)) diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index acb211133f..7a071266e4 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -529,25 +529,6 @@ func (kv *etcdKV) WatchWithRevision(key string, revision int64) clientv3.WatchCh return rch } -// MultiRemoveWithPrefix removes the keys with given prefix. -func (kv *etcdKV) MultiRemoveWithPrefix(keys []string) error { - start := time.Now() - ops := make([]clientv3.Op, 0, len(keys)) - for _, k := range keys { - op := clientv3.OpDelete(path.Join(kv.rootPath, k), clientv3.WithPrefix()) - ops = append(ops, op) - } - ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) - defer cancel() - - _, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...) - if err != nil { - log.Warn("Etcd MultiRemoveWithPrefix error", zap.Strings("keys", keys), zap.Int("len", len(keys)), zap.Error(err)) - } - CheckElapseAndWarn(start, "Slow etcd operation multi remove with prefix", zap.Strings("keys", keys)) - return err -} - // MultiSaveAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals. func (kv *etcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { start := time.Now() diff --git a/internal/kv/etcd/etcd_kv_test.go b/internal/kv/etcd/etcd_kv_test.go index 2548c0bf45..7cabcb8017 100644 --- a/internal/kv/etcd/etcd_kv_test.go +++ b/internal/kv/etcd/etcd_kv_test.go @@ -551,8 +551,8 @@ func TestEtcdKV_Load(te *testing.T) { assert.Empty(t, vs) }) - te.Run("etcdKV MultiRemoveWithPrefix", func(t *testing.T) { - rootPath := "/etcd/test/root/multi_remove_with_prefix" + te.Run("etcdKV MultiSaveAndRemoveWithPrefix", func(t *testing.T) { + rootPath := "/etcd/test/root/multi_save_and_remove_with_prefix" etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) defer etcdKV.Close() defer etcdKV.RemoveWithPrefix("") @@ -566,45 +566,6 @@ func TestEtcdKV_Load(te *testing.T) { "x/den/2": "200", } - err = etcdKV.MultiSave(prepareTests) - require.NoError(t, err) - - multiRemoveWithPrefixTests := []struct { - prefix []string - - testKey string - expectedValue string - }{ - {[]string{"x/abc"}, "x/abc/1", ""}, - {[]string{}, "x/abc/2", ""}, - {[]string{}, "x/def/1", "10"}, - {[]string{}, "x/def/2", "20"}, - {[]string{}, "x/den/1", "100"}, - {[]string{}, "x/den/2", "200"}, - {[]string{}, "not-exist", ""}, - {[]string{"x/def", "x/den"}, "x/def/1", ""}, - {[]string{}, "x/def/1", ""}, - {[]string{}, "x/def/2", ""}, - {[]string{}, "x/den/1", ""}, - {[]string{}, "x/den/2", ""}, - {[]string{}, "not-exist", ""}, - } - - for _, test := range multiRemoveWithPrefixTests { - if len(test.prefix) > 0 { - err = etcdKV.MultiRemoveWithPrefix(test.prefix) - assert.NoError(t, err) - } - - v, _ := etcdKV.Load(test.testKey) - assert.Equal(t, test.expectedValue, v) - } - - k, v, err := etcdKV.LoadWithPrefix("/") - assert.NoError(t, err) - assert.Zero(t, len(k)) - assert.Zero(t, len(v)) - // MultiSaveAndRemoveWithPrefix err = etcdKV.MultiSave(prepareTests) require.NoError(t, err) @@ -624,7 +585,7 @@ func TestEtcdKV_Load(te *testing.T) { } for _, test := range multiSaveAndRemoveWithPrefixTests { - k, _, err = etcdKV.LoadWithPrefix(test.loadPrefix) + k, _, err := etcdKV.LoadWithPrefix(test.loadPrefix) assert.NoError(t, err) assert.Equal(t, test.lengthBeforeRemove, len(k)) diff --git a/internal/kv/kv.go b/internal/kv/kv.go index 53ae73c378..f96128f9ef 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -58,7 +58,6 @@ type BaseKV interface { type TxnKV interface { BaseKV MultiSaveAndRemove(saves map[string]string, removals []string) error - MultiRemoveWithPrefix(keys []string) error MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error } diff --git a/internal/kv/mem/mem_kv.go b/internal/kv/mem/mem_kv.go index ad96662336..5a4a7d86ef 100644 --- a/internal/kv/mem/mem_kv.go +++ b/internal/kv/mem/mem_kv.go @@ -20,7 +20,6 @@ import ( "strings" "sync" - "github.com/cockroachdb/errors" "github.com/google/btree" "github.com/milvus-io/milvus/pkg/common" ) @@ -282,11 +281,6 @@ func (kv *MemoryKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) func (kv *MemoryKV) Close() { } -// MultiRemoveWithPrefix not implemented -func (kv *MemoryKV) MultiRemoveWithPrefix(keys []string) error { - return errors.New("not implement") -} - // MultiSaveAndRemoveWithPrefix saves key-value pairs in @saves, & remove key with prefix in @removals in MemoryKV atomically. func (kv *MemoryKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { kv.Lock() diff --git a/internal/kv/rocksdb/rocksdb_kv.go b/internal/kv/rocksdb/rocksdb_kv.go index ffeb3ac0d9..51eb162782 100644 --- a/internal/kv/rocksdb/rocksdb_kv.go +++ b/internal/kv/rocksdb/rocksdb_kv.go @@ -420,18 +420,6 @@ func (kv *RocksdbKV) DeleteRange(startKey, endKey string) error { return err } -// MultiRemoveWithPrefix is used to remove a batch of key-values with the same prefix -func (kv *RocksdbKV) MultiRemoveWithPrefix(prefixes []string) error { - if kv.DB == nil { - return errors.New("rocksdb instance is nil when do RemoveWithPrefix") - } - writeBatch := gorocksdb.NewWriteBatch() - defer writeBatch.Destroy() - kv.prepareRemovePrefix(prefixes, writeBatch) - err := kv.DB.Write(kv.WriteOptions, writeBatch) - return err -} - // MultiSaveAndRemoveWithPrefix is used to execute a batch operators with the same prefix func (kv *RocksdbKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { if kv.DB == nil { diff --git a/internal/kv/rocksdb/rocksdb_kv_test.go b/internal/kv/rocksdb/rocksdb_kv_test.go index 50e1516053..aa6c9ca497 100644 --- a/internal/kv/rocksdb/rocksdb_kv_test.go +++ b/internal/kv/rocksdb/rocksdb_kv_test.go @@ -215,30 +215,6 @@ func TestRocksdbKV_Txn(t *testing.T) { assert.Equal(t, len(keys), 3) assert.Equal(t, len(vals), 3) - removePrefix := []string{"abc", "abd"} - rocksdbKV.MultiRemoveWithPrefix(removePrefix) - - keys, vals, err = rocksdbKV.LoadWithPrefix("") - assert.NoError(t, err) - assert.Equal(t, len(keys), 0) - assert.Equal(t, len(vals), 0) - - err = rocksdbKV.MultiSave(kvs) - assert.NoError(t, err) - keys, vals, err = rocksdbKV.LoadWithPrefix("") - assert.NoError(t, err) - assert.Equal(t, len(keys), 3) - assert.Equal(t, len(vals), 3) - - // test delete the whole table - removePrefix = []string{"", "hello"} - rocksdbKV.MultiRemoveWithPrefix(removePrefix) - - keys, vals, err = rocksdbKV.LoadWithPrefix("") - assert.NoError(t, err) - assert.Equal(t, len(keys), 0) - assert.Equal(t, len(vals), 0) - err = rocksdbKV.MultiSave(kvs) assert.NoError(t, err) keys, vals, err = rocksdbKV.LoadWithPrefix("") @@ -247,7 +223,7 @@ func TestRocksdbKV_Txn(t *testing.T) { assert.Equal(t, len(vals), 3) // test remove and save - removePrefix = []string{"abc", "abd"} + removePrefix := []string{"abc", "abd"} kvs2 := map[string]string{ "abfad": "12345", } diff --git a/internal/kv/tikv/txn_tikv.go b/internal/kv/tikv/txn_tikv.go index 3435d1db99..dfa45b25a2 100644 --- a/internal/kv/tikv/txn_tikv.go +++ b/internal/kv/tikv/txn_tikv.go @@ -473,64 +473,6 @@ func (kv *txnTiKV) MultiSaveAndRemove(saves map[string]string, removals []string return nil } -// MultiRemoveWithPrefix removes the keys with given prefix. -func (kv *txnTiKV) MultiRemoveWithPrefix(prefixes []string) error { - start := time.Now() - ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) - defer cancel() - - var logging_error error - defer logWarnOnFailure(&logging_error, "txnTiKV MultiRemoveWithPrefix error", zap.Strings("keys", prefixes), zap.Int("len", len(prefixes))) - - txn, err := beginTxn(kv.txn) - if err != nil { - logging_error = errors.Wrap(err, "Failed to create txn for MultiRemoveWithPrefix") - return logging_error - } - - // Defer a rollback only if the transaction hasn't been committed - defer rollbackOnFailure(&logging_error, txn) - - // Need in order for err to be propogated to the defered logger - // := within forloop will shadow the original variable, resulting in logger - // missing it - for _, prefix := range prefixes { - prefix = path.Join(kv.rootPath, prefix) - // Get the start and end keys for the prefix range - startKey := []byte(prefix) - endKey := tikv.PrefixNextKey([]byte(prefix)) - // Use Scan to iterate over keys in the prefix range - iter, err := txn.Iter(startKey, endKey) - if err != nil { - logging_error = errors.Wrap(err, "Failed to create iterater for MultiRemoveWithPrefix") - return logging_error - } - // Iterate over keys and delete them - for iter.Valid() { - key := iter.Key() - err = txn.Delete(key) - if err != nil { - logging_error = errors.Wrap(err, fmt.Sprintf("Failed to delete %s for MultiRemoveWithPrefix", string(key))) - return logging_error - } - // Move the iterator to the next key - err = iter.Next() - if err != nil { - logging_error = errors.Wrap(err, fmt.Sprintf("Failed to move Iterator after key %s for MultiRemoveWithPrefix", string(key))) - return logging_error - } - } - } - - err = kv.executeTxn(txn, ctx) - if err != nil { - logging_error = errors.Wrap(err, "Failed to commit for MultiRemoveWithPrefix") - return logging_error - } - CheckElapseAndWarn(start, "Slow txnTiKV MultiRemoveWithPrefix() operation", zap.Strings("keys", prefixes)) - return nil -} - // MultiSaveAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals. func (kv *txnTiKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { start := time.Now() diff --git a/internal/kv/tikv/txn_tikv_test.go b/internal/kv/tikv/txn_tikv_test.go index 8e1229e80e..66b7d20fc3 100644 --- a/internal/kv/tikv/txn_tikv_test.go +++ b/internal/kv/tikv/txn_tikv_test.go @@ -243,7 +243,7 @@ func TestTiKVLoad(te *testing.T) { assert.Empty(t, vs) }) - te.Run("kv MultiRemoveWithPrefix", func(t *testing.T) { + te.Run("kv MultiSaveAndRemoveWithPrefix", func(t *testing.T) { rootPath := "/tikv/test/root/multi_remove_with_prefix" kv := NewTiKV(txnClient, rootPath) defer kv.Close() @@ -258,47 +258,8 @@ func TestTiKVLoad(te *testing.T) { "x/den/2": "200", } - err := kv.MultiSave(prepareTests) - require.NoError(t, err) - - multiRemoveWithPrefixTests := []struct { - prefix []string - - testKey string - expectedValue string - }{ - {[]string{"x/abc"}, "x/abc/1", ""}, - {[]string{}, "x/abc/2", ""}, - {[]string{}, "x/def/1", "10"}, - {[]string{}, "x/def/2", "20"}, - {[]string{}, "x/den/1", "100"}, - {[]string{}, "x/den/2", "200"}, - {[]string{}, "not-exist", ""}, - {[]string{"x/def", "x/den"}, "x/def/1", ""}, - {[]string{}, "x/def/1", ""}, - {[]string{}, "x/def/2", ""}, - {[]string{}, "x/den/1", ""}, - {[]string{}, "x/den/2", ""}, - {[]string{}, "not-exist", ""}, - } - - for _, test := range multiRemoveWithPrefixTests { - if len(test.prefix) > 0 { - err = kv.MultiRemoveWithPrefix(test.prefix) - assert.NoError(t, err) - } - - v, _ := kv.Load(test.testKey) - assert.Equal(t, test.expectedValue, v) - } - - k, v, err := kv.LoadWithPrefix("/") - assert.NoError(t, err) - assert.Zero(t, len(k)) - assert.Zero(t, len(v)) - // MultiSaveAndRemoveWithPrefix - err = kv.MultiSave(prepareTests) + err := kv.MultiSave(prepareTests) require.NoError(t, err) multiSaveAndRemoveWithPrefixTests := []struct { multiSave map[string]string @@ -316,7 +277,7 @@ func TestTiKVLoad(te *testing.T) { } for _, test := range multiSaveAndRemoveWithPrefixTests { - k, _, err = kv.LoadWithPrefix(test.loadPrefix) + k, _, err := kv.LoadWithPrefix(test.loadPrefix) assert.NoError(t, err) assert.Equal(t, test.lengthBeforeRemove, len(k)) @@ -352,8 +313,6 @@ func TestTiKVLoad(te *testing.T) { assert.Error(t, err) err = kv.MultiSaveAndRemoveWithPrefix(map[string]string{"y/c": "vvv"}, []string{"/"}) assert.Error(t, err) - err = kv.MultiRemoveWithPrefix([]string{"x/def", "x/den"}) - assert.Error(t, err) }) te.Run("kv failed to commit txn", func(t *testing.T) { @@ -380,8 +339,6 @@ func TestTiKVLoad(te *testing.T) { assert.Error(t, err) err = kv.MultiSaveAndRemoveWithPrefix(map[string]string{"y/c": "vvv"}, []string{"/"}) assert.Error(t, err) - err = kv.MultiRemoveWithPrefix([]string{"x/def", "x/den"}) - assert.Error(t, err) }) }