Keep individual segment's updates in a single Etcd operation (#20525)

/kind improvement

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
pull/20614/head
Ten Thousand Leaves 2022-11-15 16:45:07 +08:00 committed by GitHub
parent 35ebd17fe4
commit f3ef430c2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 75 additions and 8 deletions

View File

@ -38,6 +38,8 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const maxEtcdTxnNum = 64
type Catalog struct {
Txn kv.TxnKV
ChunkManagerRootPath string
@ -102,23 +104,40 @@ func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.Segm
if len(newSegments) == 0 {
return nil
}
kvs := make(map[string]string)
kvsBySeg := make(map[int64]map[string]string)
for _, segment := range newSegments {
segmentKvs, err := buildSegmentAndBinlogsKvs(segment)
if err != nil {
return err
}
maps.Copy(kvs, segmentKvs)
kvsBySeg[segment.GetID()] = make(map[string]string)
maps.Copy(kvsBySeg[segment.GetID()], segmentKvs)
}
// Split kvs into multiple operations to avoid over-sized operations.
// Also make sure kvs of the same segment are not split into different operations.
kvsPiece := make(map[string]string)
currSize := 0
saveFn := func(partialKvs map[string]string) error {
return kc.Txn.MultiSave(partialKvs)
}
if err := etcd.SaveByBatch(kvs, saveFn); err != nil {
return err
for _, kvs := range kvsBySeg {
if currSize+len(kvs) >= maxEtcdTxnNum {
if err := etcd.SaveByBatch(kvsPiece, saveFn); err != nil {
log.Error("failed to save by batch", zap.Error(err))
return err
}
kvsPiece = make(map[string]string)
currSize = 0
}
maps.Copy(kvsPiece, kvs)
currSize += len(kvs)
}
if currSize > 0 {
if err := etcd.SaveByBatch(kvsPiece, saveFn); err != nil {
log.Error("failed to save by batch", zap.Error(err))
return err
}
}
return nil
}
@ -219,7 +238,7 @@ func (kc *Catalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments [
return kc.Txn.MultiSave(kvs)
}
// RevertAlterSegmentsAndAddNewSegment reverts the metastore operation of AtlerSegmentsAndAddNewSegment
// RevertAlterSegmentsAndAddNewSegment reverts the metastore operation of AlterSegmentsAndAddNewSegment
func (kc *Catalog) RevertAlterSegmentsAndAddNewSegment(ctx context.Context, oldSegments []*datapb.SegmentInfo, removeSegment *datapb.SegmentInfo) error {
var (
kvs = make(map[string]string)

View File

@ -335,6 +335,54 @@ func Test_AlterSegments(t *testing.T) {
assert.Equal(t, 4, len(savedKvs))
verifySavedKvsForSegment(t, savedKvs)
})
t.Run("save large ops successfully", func(t *testing.T) {
txn := &MockedTxnKV{}
savedKvs := make(map[string]string)
opGroupCount := 0
txn.multiSave = func(kvs map[string]string) error {
var ks []string
for k := range kvs {
ks = append(ks, k)
}
maps.Copy(savedKvs, kvs)
opGroupCount++
return nil
}
catalog := &Catalog{txn, "a"}
err := catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{})
assert.Nil(t, err)
var binlogXL []*datapb.FieldBinlog
for i := 0; i < 255; i++ {
binlogXL = append(binlogXL, &datapb.FieldBinlog{
FieldID: int64(i),
Binlogs: []*datapb.Binlog{
{
EntriesNum: 5,
LogPath: binlogPath,
},
},
})
}
segmentXL := &datapb.SegmentInfo{
ID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
NumOfRows: 100,
State: commonpb.SegmentState_Flushed,
Binlogs: binlogXL,
Deltalogs: deltalogs,
Statslogs: statslogs,
}
err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segmentXL})
assert.Nil(t, err)
assert.Equal(t, 255+3, len(savedKvs))
assert.Equal(t, 5, opGroupCount)
})
}
func Test_AlterSegmentsAndAddNewSegment(t *testing.T) {