From f3ef430c2ef3e422b0c509ef1c6dd68600f7f029 Mon Sep 17 00:00:00 2001 From: Ten Thousand Leaves <69466447+soothing-rain@users.noreply.github.com> Date: Tue, 15 Nov 2022 16:45:07 +0800 Subject: [PATCH] Keep individual segment's updates in a single Etcd operation (#20525) /kind improvement Signed-off-by: Yuchen Gao Signed-off-by: Yuchen Gao --- internal/metastore/kv/datacoord/kv_catalog.go | 35 ++++++++++---- .../metastore/kv/datacoord/kv_catalog_test.go | 48 +++++++++++++++++++ 2 files changed, 75 insertions(+), 8 deletions(-) diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index a814c7139c..c3909e4b40 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -38,6 +38,8 @@ import ( "github.com/milvus-io/milvus/internal/util/typeutil" ) +const maxEtcdTxnNum = 64 + type Catalog struct { Txn kv.TxnKV ChunkManagerRootPath string @@ -102,23 +104,40 @@ func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.Segm if len(newSegments) == 0 { return nil } - - kvs := make(map[string]string) + kvsBySeg := make(map[int64]map[string]string) for _, segment := range newSegments { segmentKvs, err := buildSegmentAndBinlogsKvs(segment) if err != nil { return err } - maps.Copy(kvs, segmentKvs) + kvsBySeg[segment.GetID()] = make(map[string]string) + maps.Copy(kvsBySeg[segment.GetID()], segmentKvs) } - + // Split kvs into multiple operations to avoid over-sized operations. + // Also make sure kvs of the same segment are not split into different operations. + kvsPiece := make(map[string]string) + currSize := 0 saveFn := func(partialKvs map[string]string) error { return kc.Txn.MultiSave(partialKvs) } - if err := etcd.SaveByBatch(kvs, saveFn); err != nil { - return err + for _, kvs := range kvsBySeg { + if currSize+len(kvs) >= maxEtcdTxnNum { + if err := etcd.SaveByBatch(kvsPiece, saveFn); err != nil { + log.Error("failed to save by batch", zap.Error(err)) + return err + } + kvsPiece = make(map[string]string) + currSize = 0 + } + maps.Copy(kvsPiece, kvs) + currSize += len(kvs) + } + if currSize > 0 { + if err := etcd.SaveByBatch(kvsPiece, saveFn); err != nil { + log.Error("failed to save by batch", zap.Error(err)) + return err + } } - return nil } @@ -219,7 +238,7 @@ func (kc *Catalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments [ return kc.Txn.MultiSave(kvs) } -// RevertAlterSegmentsAndAddNewSegment reverts the metastore operation of AtlerSegmentsAndAddNewSegment +// RevertAlterSegmentsAndAddNewSegment reverts the metastore operation of AlterSegmentsAndAddNewSegment func (kc *Catalog) RevertAlterSegmentsAndAddNewSegment(ctx context.Context, oldSegments []*datapb.SegmentInfo, removeSegment *datapb.SegmentInfo) error { var ( kvs = make(map[string]string) diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index 021c87d5f4..ddf3ad65e4 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -335,6 +335,54 @@ func Test_AlterSegments(t *testing.T) { assert.Equal(t, 4, len(savedKvs)) verifySavedKvsForSegment(t, savedKvs) }) + + t.Run("save large ops successfully", func(t *testing.T) { + txn := &MockedTxnKV{} + savedKvs := make(map[string]string) + opGroupCount := 0 + txn.multiSave = func(kvs map[string]string) error { + var ks []string + for k := range kvs { + ks = append(ks, k) + } + maps.Copy(savedKvs, kvs) + opGroupCount++ + return nil + } + + catalog := &Catalog{txn, "a"} + err := catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{}) + assert.Nil(t, err) + + var binlogXL []*datapb.FieldBinlog + for i := 0; i < 255; i++ { + binlogXL = append(binlogXL, &datapb.FieldBinlog{ + FieldID: int64(i), + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 5, + LogPath: binlogPath, + }, + }, + }) + } + + segmentXL := &datapb.SegmentInfo{ + ID: segmentID, + CollectionID: collectionID, + PartitionID: partitionID, + NumOfRows: 100, + State: commonpb.SegmentState_Flushed, + Binlogs: binlogXL, + Deltalogs: deltalogs, + Statslogs: statslogs, + } + + err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segmentXL}) + assert.Nil(t, err) + assert.Equal(t, 255+3, len(savedKvs)) + assert.Equal(t, 5, opGroupCount) + }) } func Test_AlterSegmentsAndAddNewSegment(t *testing.T) {