enhance: use the only MaxEtcdTxnNum (#33070)

#33071

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
pull/33123/head
smellthemoon 2024-05-17 14:27:42 +08:00 committed by GitHub
parent 1ef975d327
commit 225f4a6134
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 35 additions and 67 deletions

View File

@ -45,8 +45,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var maxEtcdTxnNum = 128
var paginationSize = 2000
type Catalog struct {
@ -341,33 +339,11 @@ func (kc *Catalog) SaveByBatch(kvs map[string]string) error {
saveFn := func(partialKvs map[string]string) error {
return kc.MetaKv.MultiSave(partialKvs)
}
if len(kvs) <= maxEtcdTxnNum {
if err := etcd.SaveByBatch(kvs, saveFn); err != nil {
err := etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, saveFn)
if err != nil {
log.Error("failed to save by batch", zap.Error(err))
return err
}
} else {
// Split kvs into multiple operations to avoid over-sized operations.
// Also make sure kvs of the same segment are not split into different operations.
batch := make(map[string]string)
for k, v := range kvs {
if len(batch) == maxEtcdTxnNum {
if err := etcd.SaveByBatch(batch, saveFn); err != nil {
log.Error("failed to save by batch", zap.Error(err))
return err
}
maps.Clear(batch)
}
batch[k] = v
}
if len(batch) > 0 {
if err := etcd.SaveByBatch(batch, saveFn); err != nil {
log.Error("failed to save by batch", zap.Error(err))
return err
}
}
}
return nil
}
@ -434,7 +410,7 @@ func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*d
saveFn := func(partialKvs map[string]string) error {
return kc.MetaKv.MultiSave(partialKvs)
}
if err := etcd.SaveByBatch(kvs, saveFn); err != nil {
if err := etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum, saveFn); err != nil {
return err
}

View File

@ -27,10 +27,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
maxTxnNum = 64
)
// prefix/collection/collection_id -> CollectionInfo
// prefix/partitions/collection_id/partition_id -> PartitionInfo
// prefix/aliases/alias_name -> AliasInfo
@ -87,11 +83,13 @@ func BuildAliasPrefixWithDB(dbID int64) string {
return fmt.Sprintf("%s/%s/%d", DatabaseMetaPrefix, Aliases, dbID)
}
func batchMultiSaveAndRemoveWithPrefix(snapshot kv.SnapShotKV, maxTxnNum int, saves map[string]string, removals []string, ts typeutil.Timestamp) error {
// since SnapshotKV may save both snapshot key and the original key if the original key is newest
// MaxEtcdTxnNum need to divided by 2
func batchMultiSaveAndRemoveWithPrefix(snapshot kv.SnapShotKV, limit int, saves map[string]string, removals []string, ts typeutil.Timestamp) error {
saveFn := func(partialKvs map[string]string) error {
return snapshot.MultiSave(partialKvs, ts)
}
if err := etcd.SaveByBatchWithLimit(saves, maxTxnNum, saveFn); err != nil {
if err := etcd.SaveByBatchWithLimit(saves, limit, saveFn); err != nil {
return err
}
@ -104,7 +102,7 @@ func batchMultiSaveAndRemoveWithPrefix(snapshot kv.SnapShotKV, maxTxnNum int, sa
removeFn := func(partialKeys []string) error {
return snapshot.MultiSaveAndRemoveWithPrefix(nil, partialKeys, ts)
}
return etcd.RemoveByBatchWithLimit(removals, maxTxnNum/2, removeFn)
return etcd.RemoveByBatchWithLimit(removals, limit, removeFn)
}
func (kc *Catalog) CreateDatabase(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error {
@ -200,7 +198,9 @@ func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection,
// Though batchSave is not atomic enough, we can promise the atomicity outside.
// Recovering from failure, if we found collection is creating, we should remove all these related meta.
return etcd.SaveByBatchWithLimit(kvs, maxTxnNum/2, func(partialKvs map[string]string) error {
// since SnapshotKV may save both snapshot key and the original key if the original key is newest
// MaxEtcdTxnNum need to divided by 2
return etcd.SaveByBatchWithLimit(kvs, util.MaxEtcdTxnNum/2, func(partialKvs map[string]string) error {
return kc.Snapshot.MultiSave(partialKvs, ts)
})
}
@ -453,9 +453,9 @@ func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Col
// Though batchMultiSaveAndRemoveWithPrefix is not atomic enough, we can promise atomicity outside.
// If we found collection under dropping state, we'll know that gc is not completely on this collection.
// However, if we remove collection first, we cannot remove other metas.
// We set maxTxnNum to 64, since SnapshotKV may save both snapshot key and the original key if the original key is
// newest.
if err := batchMultiSaveAndRemoveWithPrefix(kc.Snapshot, maxTxnNum, nil, delMetakeysSnap, ts); err != nil {
// since SnapshotKV may save both snapshot key and the original key if the original key is newest
// MaxEtcdTxnNum need to divided by 2
if err := batchMultiSaveAndRemoveWithPrefix(kc.Snapshot, util.MaxEtcdTxnNum/2, nil, delMetakeysSnap, ts); err != nil {
return err
}

View File

@ -949,7 +949,7 @@ func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) {
return errors.New("error mock MultiSave")
}
saves := map[string]string{"k": "v"}
err := batchMultiSaveAndRemoveWithPrefix(snapshot, maxTxnNum, saves, []string{}, 0)
err := batchMultiSaveAndRemoveWithPrefix(snapshot, util.MaxEtcdTxnNum/2, saves, []string{}, 0)
assert.Error(t, err)
})
t.Run("failed to remove", func(t *testing.T) {
@ -962,7 +962,7 @@ func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) {
}
saves := map[string]string{"k": "v"}
removals := []string{"prefix1", "prefix2"}
err := batchMultiSaveAndRemoveWithPrefix(snapshot, maxTxnNum, saves, removals, 0)
err := batchMultiSaveAndRemoveWithPrefix(snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0)
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
@ -983,7 +983,7 @@ func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) {
saves[fmt.Sprintf("k%d", i)] = fmt.Sprintf("v%d", i)
removals = append(removals, fmt.Sprintf("k%d", i))
}
err := batchMultiSaveAndRemoveWithPrefix(snapshot, 64, saves, removals, 0)
err := batchMultiSaveAndRemoveWithPrefix(snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0)
assert.NoError(t, err)
})
}

View File

@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
@ -596,7 +597,7 @@ func (ss *SuffixSnapshot) batchRemoveExpiredKvs(keyGroup []string, originalKey s
removeFn := func(partialKeys []string) error {
return ss.MetaKv.MultiRemove(keyGroup)
}
return etcd.RemoveByBatch(keyGroup, removeFn)
return etcd.RemoveByBatchWithLimit(keyGroup, util.MaxEtcdTxnNum, removeFn)
}
func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time) error {

View File

@ -70,6 +70,8 @@ const (
RoleConfigObjectName = "object_name"
RoleConfigDBName = "db_name"
RoleConfigPrivilege = "privilege"
MaxEtcdTxnNum = 128
)
const (

View File

@ -33,8 +33,6 @@ import (
"github.com/milvus-io/milvus/pkg/log"
)
var maxTxnNum = 128
// GetEtcdClient returns etcd client
// should only used for test
func GetEtcdClient(
@ -191,11 +189,6 @@ func SaveByBatchWithLimit(kvs map[string]string, limit int, op func(partialKvs m
return nil
}
// SaveByBatch there will not guarantee atomicity.
func SaveByBatch(kvs map[string]string, op func(partialKvs map[string]string) error) error {
return SaveByBatchWithLimit(kvs, maxTxnNum, op)
}
func RemoveByBatchWithLimit(removals []string, limit int, op func(partialKeys []string) error) error {
if len(removals) == 0 {
return nil
@ -211,10 +204,6 @@ func RemoveByBatchWithLimit(removals []string, limit int, op func(partialKeys []
return nil
}
func RemoveByBatch(removals []string, op func(partialKeys []string) error) error {
return RemoveByBatchWithLimit(removals, maxTxnNum, op)
}
func buildKvGroup(keys, values []string) (map[string]string, error) {
if len(keys) != len(values) {
return nil, fmt.Errorf("length of keys (%d) and values (%d) are not equal", len(keys), len(values))

View File

@ -104,8 +104,8 @@ func Test_SaveByBatch(t *testing.T) {
return nil
}
maxTxnNum = 2
err := SaveByBatch(kvs, saveFn)
limit := 2
err := SaveByBatchWithLimit(kvs, limit, saveFn)
assert.NoError(t, err)
assert.Equal(t, 0, group)
assert.Equal(t, 0, count)
@ -126,8 +126,8 @@ func Test_SaveByBatch(t *testing.T) {
return nil
}
maxTxnNum = 2
err := SaveByBatch(kvs, saveFn)
limit := 2
err := SaveByBatchWithLimit(kvs, limit, saveFn)
assert.NoError(t, err)
assert.Equal(t, 2, group)
assert.Equal(t, 3, count)
@ -142,8 +142,8 @@ func Test_SaveByBatch(t *testing.T) {
"k2": "v2",
"k3": "v3",
}
maxTxnNum = 2
err := SaveByBatch(kvs, saveFn)
limit := 2
err := SaveByBatchWithLimit(kvs, limit, saveFn)
assert.Error(t, err)
})
}
@ -160,8 +160,8 @@ func Test_RemoveByBatch(t *testing.T) {
return nil
}
maxTxnNum = 2
err := RemoveByBatch(kvs, removeFn)
limit := 2
err := RemoveByBatchWithLimit(kvs, limit, removeFn)
assert.NoError(t, err)
assert.Equal(t, 0, group)
assert.Equal(t, 0, count)
@ -178,8 +178,8 @@ func Test_RemoveByBatch(t *testing.T) {
return nil
}
maxTxnNum = 2
err := RemoveByBatch(kvs, removeFn)
limit := 2
err := RemoveByBatchWithLimit(kvs, limit, removeFn)
assert.NoError(t, err)
assert.Equal(t, 3, group)
assert.Equal(t, 5, count)
@ -190,8 +190,8 @@ func Test_RemoveByBatch(t *testing.T) {
return errors.New("mock")
}
kvs := []string{"k1", "k2", "k3", "k4", "k5"}
maxTxnNum = 2
err := RemoveByBatch(kvs, removeFn)
limit := 2
err := RemoveByBatchWithLimit(kvs, limit, removeFn)
assert.Error(t, err)
})
}