diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 4d1e3ddda4..46d287c91c 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -30,6 +30,8 @@ import ( "testing" "time" + "github.com/milvus-io/milvus/internal/util/metautil" + "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -545,8 +547,28 @@ func TestGetSegmentInfo(t *testing.T) { defer closeTestServer(t, svr) segInfo := &datapb.SegmentInfo{ - ID: 0, - State: commonpb.SegmentState_Flushed, + ID: 0, + State: commonpb.SegmentState_Flushed, + NumOfRows: 100, + Binlogs: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 20, + LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 801), + }, + { + EntriesNum: 20, + LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 802), + }, + { + EntriesNum: 20, + LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 803), + }, + }, + }, + }, } err := svr.meta.AddSegment(NewSegmentInfo(segInfo)) assert.Nil(t, err) @@ -555,6 +577,9 @@ func TestGetSegmentInfo(t *testing.T) { SegmentIDs: []int64{0}, } resp, err := svr.GetSegmentInfo(svr.ctx, req) + assert.Equal(t, 1, len(resp.GetInfos())) + // Check that # of rows is corrected from 100 to 60. + assert.EqualValues(t, 60, resp.GetInfos()[0].GetNumOfRows()) assert.Nil(t, err) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) @@ -2318,7 +2343,41 @@ func TestGetRecoveryInfo(t *testing.T) { assert.NoError(t, err) seg1 := createSegment(0, 0, 0, 100, 10, "vchan1", commonpb.SegmentState_Flushed) + seg1.Binlogs = []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 20, + LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901), + }, + { + EntriesNum: 20, + LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902), + }, + { + EntriesNum: 20, + LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903), + }, + }, + }, + } seg2 := createSegment(1, 0, 0, 100, 20, "vchan1", commonpb.SegmentState_Flushed) + seg2.Binlogs = []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 30, + LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801), + }, + { + EntriesNum: 70, + LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802), + }, + }, + }, + } err = svr.meta.AddSegment(NewSegmentInfo(seg1)) assert.Nil(t, err) err = svr.meta.AddSegment(NewSegmentInfo(seg2)) @@ -2362,8 +2421,11 @@ func TestGetRecoveryInfo(t *testing.T) { assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) assert.EqualValues(t, 1, len(resp.GetChannels())) assert.EqualValues(t, 0, len(resp.GetChannels()[0].GetUnflushedSegmentIds())) - //assert.ElementsMatch(t, []*datapb.SegmentInfo{trimSegmentInfo(seg1), trimSegmentInfo(seg2)}, resp.GetChannels()[0].GetFlushedSegments()) + assert.ElementsMatch(t, []int64{0, 1}, resp.GetChannels()[0].GetFlushedSegmentIds()) assert.EqualValues(t, 10, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) + assert.EqualValues(t, 2, len(resp.GetBinlogs())) + // Row count corrected from 100 + 100 -> 100 + 60. + assert.EqualValues(t, 160, resp.GetBinlogs()[0].GetNumOfRows()+resp.GetBinlogs()[1].GetNumOfRows()) }) t.Run("test get recovery of unflushed segments ", func(t *testing.T) { @@ -2386,7 +2448,41 @@ func TestGetRecoveryInfo(t *testing.T) { assert.NoError(t, err) seg1 := createSegment(3, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Growing) + seg1.Binlogs = []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 20, + LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901), + }, + { + EntriesNum: 20, + LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902), + }, + { + EntriesNum: 20, + LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903), + }, + }, + }, + } seg2 := createSegment(4, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Growing) + seg2.Binlogs = []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 30, + LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801), + }, + { + EntriesNum: 70, + LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802), + }, + }, + }, + } err = svr.meta.AddSegment(NewSegmentInfo(seg1)) assert.Nil(t, err) err = svr.meta.AddSegment(NewSegmentInfo(seg2)) @@ -2884,7 +2980,7 @@ type rootCoordSegFlushComplete struct { flag bool } -//SegmentFlushCompleted, override default behavior +// SegmentFlushCompleted, override default behavior func (rc *rootCoordSegFlushComplete) SegmentFlushCompleted(ctx context.Context, req *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) { if rc.flag { return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 8fee875959..9cd6f12e1e 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -23,25 +23,24 @@ import ( "strconv" "sync" - "github.com/milvus-io/milvus/internal/util/commonpbutil" - "github.com/milvus-io/milvus/internal/util/errorutil" - - "golang.org/x/sync/errgroup" - "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/util/commonpbutil" + "github.com/milvus-io/milvus/internal/util/errorutil" "github.com/milvus-io/milvus/internal/util/logutil" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/retry" + "github.com/milvus-io/milvus/internal/util/segmentutil" "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/samber/lo" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) // checks whether server in Healthy State @@ -335,6 +334,7 @@ func (s *Server) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringRes } // GetSegmentInfo returns segment info requested, status, row count, etc included +// Called by: QueryCoord, DataNode, IndexCoord, Proxy. func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { resp := &datapb.GetSegmentInfoResponse{ Status: &commonpb.Status{ @@ -359,20 +359,22 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR } child := s.meta.GetCompactionTo(id) + clonedInfo := info.Clone() if child != nil { - info = info.Clone() - info.Deltalogs = append(info.Deltalogs, child.GetDeltalogs()...) - info.DmlPosition = child.GetDmlPosition() + clonedInfo.Deltalogs = append(clonedInfo.Deltalogs, child.GetDeltalogs()...) + clonedInfo.DmlPosition = child.GetDmlPosition() } - - infos = append(infos, info.SegmentInfo) + segmentutil.ReCalcRowCount(info.SegmentInfo, clonedInfo.SegmentInfo) + infos = append(infos, clonedInfo.SegmentInfo) } else { info = s.meta.GetSegment(id) if info == nil { resp.Status.Reason = msgSegmentNotFound(id) return resp, nil } - infos = append(infos, info.SegmentInfo) + clonedInfo := info.Clone() + segmentutil.ReCalcRowCount(info.SegmentInfo, clonedInfo.SegmentInfo) + infos = append(infos, clonedInfo.SegmentInfo) } vchannel := info.InsertChannel if _, ok := channelCPs[vchannel]; vchannel != "" && !ok { @@ -591,7 +593,8 @@ func (s *Server) GetComponentStates(ctx context.Context) (*milvuspb.ComponentSta return resp, nil } -// GetRecoveryInfo get recovery info for segment +// GetRecoveryInfo get recovery info for segment. +// Called by: QueryCoord. func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) { collectionID := req.GetCollectionID() partitionID := req.GetPartitionID() @@ -678,7 +681,15 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf segment2Binlogs[id] = append(segment2Binlogs[id], fieldBinlogs) } - segmentsNumOfRows[id] = segment.NumOfRows + if newCount := segmentutil.CalcRowCountFromBinLog(segment.SegmentInfo); newCount != segment.NumOfRows { + log.Warn("segment row number meta inconsistent with bin log row count and will be corrected", + zap.Int64("segment ID", segment.GetID()), + zap.Int64("segment meta row count (wrong)", segment.GetNumOfRows()), + zap.Int64("segment bin log row count (correct)", newCount)) + segmentsNumOfRows[id] = newCount + } else { + segmentsNumOfRows[id] = segment.NumOfRows + } statsBinlogs := segment.GetStatslogs() field2StatsBinlog := make(map[UniqueID][]*datapb.Binlog) diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 6a82f9de32..2d3d6ec2ce 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -23,10 +23,9 @@ import ( "sync" "time" - "github.com/milvus-io/milvus/internal/common" - "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/types" diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index da81f656c2..a08f444ca3 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/metautil" + "github.com/milvus-io/milvus/internal/util/segmentutil" "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -229,6 +230,25 @@ func (kc *Catalog) AddSegment(ctx context.Context, segment *datapb.SegmentInfo) return kc.MetaKv.MultiSave(kvs) } +// LoadFromSegmentPath loads segment info from persistent storage by given segment path. +// # TESTING ONLY # +func (kc *Catalog) LoadFromSegmentPath(colID, partID, segID typeutil.UniqueID) (*datapb.SegmentInfo, error) { + v, err := kc.MetaKv.Load(buildSegmentPath(colID, partID, segID)) + if err != nil { + log.Error("(testing only) failed to load segment info by segment path") + return nil, err + } + + segInfo := &datapb.SegmentInfo{} + err = proto.Unmarshal([]byte(v), segInfo) + if err != nil { + log.Error("(testing only) failed to unmarshall segment info") + return nil, err + } + + return segInfo, nil +} + func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo) error { if len(newSegments) == 0 { return nil @@ -318,7 +338,8 @@ func (kc *Catalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments [ for _, s := range segments { noBinlogsSegment, binlogs, deltalogs, statslogs := CloneSegmentWithExcludeBinlogs(s) - + // `s` is not mutated above. Also, `noBinlogsSegment` is a cloned version of `s`. + segmentutil.ReCalcRowCount(s, noBinlogsSegment) // for compacted segments if noBinlogsSegment.State == commonpb.SegmentState_Dropped { hasBinlogkeys, err := kc.hasBinlogPrefix(s) @@ -408,6 +429,8 @@ func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*d for _, s := range segments { key := buildSegmentPath(s.GetCollectionID(), s.GetPartitionID(), s.GetID()) noBinlogsSegment, _, _, _ := CloneSegmentWithExcludeBinlogs(s) + // `s` is not mutated above. Also, `noBinlogsSegment` is a cloned version of `s`. + segmentutil.ReCalcRowCount(s, noBinlogsSegment) segBytes, err := proto.Marshal(noBinlogsSegment) if err != nil { return fmt.Errorf("failed to marshal segment: %d, err: %w", s.GetID(), err) @@ -666,6 +689,8 @@ func buildBinlogKvsWithLogID(collectionID, partitionID, segmentID typeutil.Uniqu func buildSegmentAndBinlogsKvs(segment *datapb.SegmentInfo) (map[string]string, error) { noBinlogsSegment, binlogs, deltalogs, statslogs := CloneSegmentWithExcludeBinlogs(segment) + // `segment` is not mutated above. Also, `noBinlogsSegment` is a cloned version of `segment`. + segmentutil.ReCalcRowCount(segment, noBinlogsSegment) // save binlogs separately kvs, err := buildBinlogKvsWithLogID(noBinlogsSegment.CollectionID, noBinlogsSegment.PartitionID, noBinlogsSegment.ID, binlogs, deltalogs, statslogs) diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index 5242874acc..27419df787 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -24,8 +24,8 @@ type MockedTxnKV struct { multiSave func(kvs map[string]string) error save func(key, value string) error loadWithPrefix func(key string) ([]string, []string, error) - multiRemove func(keys []string) error load func(key string) (string, error) + multiRemove func(keys []string) error walkWithPrefix func(prefix string, paginationSize int, fn func([]byte, []byte) error) error remove func(key string) error } @@ -311,15 +311,26 @@ func Test_AddSegments(t *testing.T) { t.Run("save successfully", func(t *testing.T) { txn := &MockedTxnKV{} - var savedKvs map[string]string + savedKvs := make(map[string]string) txn.multiSave = func(kvs map[string]string) error { savedKvs = kvs return nil } + txn.load = func(key string) (string, error) { + if v, ok := savedKvs[key]; ok { + return v, nil + } + return "", errors.New("key not found") + } catalog := NewCatalog(txn, rootPath, "") err := catalog.AddSegment(context.TODO(), segment1) assert.Nil(t, err) + adjustedSeg, err := catalog.LoadFromSegmentPath(segment1.CollectionID, segment1.PartitionID, segment1.ID) + assert.NoError(t, err) + // Check that num of rows is corrected from 100 to 5. + assert.Equal(t, int64(100), segment1.GetNumOfRows()) + assert.Equal(t, int64(5), adjustedSeg.GetNumOfRows()) _, ok := savedKvs[k4] assert.False(t, ok) @@ -387,6 +398,12 @@ func Test_AlterSegments(t *testing.T) { opGroupCount++ return nil } + txn.load = func(key string) (string, error) { + if v, ok := savedKvs[key]; ok { + return v, nil + } + return "", errors.New("key not found") + } catalog := NewCatalog(txn, rootPath, "") err := catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{}) @@ -420,6 +437,12 @@ func Test_AlterSegments(t *testing.T) { assert.Nil(t, err) assert.Equal(t, 255+3, len(savedKvs)) assert.Equal(t, 5, opGroupCount) + + adjustedSeg, err := catalog.LoadFromSegmentPath(segmentXL.CollectionID, segmentXL.PartitionID, segmentXL.ID) + assert.NoError(t, err) + // Check that num of rows is corrected from 100 to 1275. + assert.Equal(t, int64(100), segmentXL.GetNumOfRows()) + assert.Equal(t, int64(5), adjustedSeg.GetNumOfRows()) }) } @@ -453,10 +476,15 @@ func Test_AlterSegmentsAndAddNewSegment(t *testing.T) { maps.Copy(savedKvs, kvs) return nil } - txn.loadWithPrefix = func(key string) ([]string, []string, error) { return []string{}, []string{}, nil } + txn.load = func(key string) (string, error) { + if v, ok := savedKvs[key]; ok { + return v, nil + } + return "", errors.New("key not found") + } catalog := NewCatalog(txn, rootPath, "") err := catalog.AlterSegmentsAndAddNewSegment(context.TODO(), []*datapb.SegmentInfo{droppedSegment}, segment1) @@ -465,6 +493,18 @@ func Test_AlterSegmentsAndAddNewSegment(t *testing.T) { assert.Equal(t, 8, len(savedKvs)) verifySavedKvsForDroppedSegment(t, savedKvs) verifySavedKvsForSegment(t, savedKvs) + + adjustedSeg, err := catalog.LoadFromSegmentPath(droppedSegment.CollectionID, droppedSegment.PartitionID, droppedSegment.ID) + assert.NoError(t, err) + // Check that num of rows is corrected from 100 to 5. + assert.Equal(t, int64(100), droppedSegment.GetNumOfRows()) + assert.Equal(t, int64(5), adjustedSeg.GetNumOfRows()) + + adjustedSeg, err = catalog.LoadFromSegmentPath(segment1.CollectionID, segment1.PartitionID, segment1.ID) + assert.NoError(t, err) + // Check that num of rows is corrected from 100 to 5. + assert.Equal(t, int64(100), droppedSegment.GetNumOfRows()) + assert.Equal(t, int64(5), adjustedSeg.GetNumOfRows()) }) } diff --git a/internal/util/segmentutil/utils.go b/internal/util/segmentutil/utils.go new file mode 100644 index 0000000000..07f3046dfa --- /dev/null +++ b/internal/util/segmentutil/utils.go @@ -0,0 +1,33 @@ +package segmentutil + +import ( + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/proto/datapb" + "go.uber.org/zap" +) + +// ReCalcRowCount re-calculates number of rows of `oldSeg` based on its bin log count, and correct its value in its +// cloned copy, which is `newSeg`. +// Note that `segCloned` should be a copied version of `seg`. +func ReCalcRowCount(seg, segCloned *datapb.SegmentInfo) { + // `segment` is not mutated but only cloned above and is safe to be referred here. + if newCount := CalcRowCountFromBinLog(seg); newCount != seg.GetNumOfRows() { + log.Warn("segment row number meta inconsistent with bin log row count and will be corrected", + zap.Int64("segment ID", seg.GetID()), + zap.Int64("segment meta row count (wrong)", seg.GetNumOfRows()), + zap.Int64("segment bin log row count (correct)", newCount)) + // Update the corrected row count. + segCloned.NumOfRows = newCount + } +} + +// CalcRowCountFromBinLog calculates # of rows of a segment from bin logs +func CalcRowCountFromBinLog(seg *datapb.SegmentInfo) int64 { + var rowCt int64 + if len(seg.GetBinlogs()) > 0 { + for _, ct := range seg.GetBinlogs()[0].GetBinlogs() { + rowCt += ct.GetEntriesNum() + } + } + return rowCt +}