From 36b8fbbadc31e2c53f0cc19103a169d05d4dff2d Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Wed, 24 Jan 2024 14:23:00 +0800 Subject: [PATCH] fix: Donot set metrics for compactTo 0 rows seg (#30126) See also: #29204 Signed-off-by: yangxuan --- internal/datacoord/meta.go | 8 +- internal/datacoord/meta_test.go | 176 ++++++++++++++++---------------- 2 files changed, 95 insertions(+), 89 deletions(-) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index d101790da6..61f0701329 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1020,6 +1020,7 @@ func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan, } // find new added delta logs when executing compaction + // TODO: won't be needed when enable L0 Segment var originDeltalogs []*datapb.FieldBinlog for _, s := range modSegments { originDeltalogs = append(originDeltalogs, s.GetDeltalogs()...) @@ -1064,7 +1065,12 @@ func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan, Level: datapb.SegmentLevel_L1, } segment := NewSegmentInfo(segmentInfo) - metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows()) + + // L1 segment with NumRows=0 will be discarded, so no need to change the metric + if segmentInfo.GetNumOfRows() > 0 { + metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows()) + } + log.Info("meta update: prepare for complete compaction mutation - complete", zap.Int64("collectionID", segment.GetCollectionID()), zap.Int64("partitionID", segment.GetPartitionID()), diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 3745b5b37b..849c295dfa 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -25,7 +25,6 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -198,6 +197,94 @@ func (suite *MetaBasicSuite) TestCollection() { suite.MetricsEqual(metrics.DataCoordNumCollections.WithLabelValues(), 1) } +func (suite *MetaBasicSuite) TestPrepareCompleteCompactionMutation() { + prepareSegments := &SegmentsInfo{ + map[UniqueID]*SegmentInfo{ + 1: {SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 100, + PartitionID: 10, + State: commonpb.SegmentState_Flushed, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 1, 2)}, + NumOfRows: 1, + }}, + 2: {SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 100, + PartitionID: 10, + State: commonpb.SegmentState_Flushed, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 3, 4)}, + NumOfRows: 1, + }}, + }, + } + + // m := suite.meta + m := &meta{ + catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, + segments: prepareSegments, + } + + plan := &datapb.CompactionPlan{ + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 1, + FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 1, 2)}, + }, + { + SegmentID: 2, + FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 3, 4)}, + }, + }, + StartTime: 15, + } + + inSegment := &datapb.CompactionSegment{ + SegmentID: 3, + InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 5)}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 5)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 5)}, + NumOfRows: 2, + } + inCompactionResult := &datapb.CompactionPlanResult{ + Segments: []*datapb.CompactionSegment{inSegment}, + } + afterCompact, newSegment, metricMutation, err := m.PrepareCompleteCompactionMutation(plan, inCompactionResult) + suite.NoError(err) + suite.NotNil(afterCompact) + suite.NotNil(newSegment) + suite.Equal(2, len(metricMutation.stateChange[datapb.SegmentLevel_Legacy.String()])) + suite.Equal(1, len(metricMutation.stateChange[datapb.SegmentLevel_L1.String()])) + suite.Equal(int64(0), metricMutation.rowCountChange) + suite.Equal(int64(2), metricMutation.rowCountAccChange) + + suite.Require().Equal(2, len(afterCompact)) + suite.Equal(commonpb.SegmentState_Dropped, afterCompact[0].GetState()) + suite.Equal(commonpb.SegmentState_Dropped, afterCompact[1].GetState()) + suite.NotZero(afterCompact[0].GetDroppedAt()) + suite.NotZero(afterCompact[1].GetDroppedAt()) + + suite.Equal(inSegment.SegmentID, newSegment.GetID()) + suite.Equal(UniqueID(100), newSegment.GetCollectionID()) + suite.Equal(UniqueID(10), newSegment.GetPartitionID()) + suite.Equal(inSegment.NumOfRows, newSegment.GetNumOfRows()) + suite.Equal(commonpb.SegmentState_Flushed, newSegment.GetState()) + + suite.EqualValues(inSegment.GetInsertLogs(), newSegment.GetBinlogs()) + suite.EqualValues(inSegment.GetField2StatslogPaths(), newSegment.GetStatslogs()) + suite.EqualValues(inSegment.GetDeltalogs(), newSegment.GetDeltalogs()) + suite.NotZero(newSegment.lastFlushTime) + suite.Equal(uint64(15), newSegment.GetLastExpireTime()) +} + func TestMeta(t *testing.T) { suite.Run(t, new(MetaBasicSuite)) suite.Run(t, new(MetaReloadSuite)) @@ -692,93 +779,6 @@ func TestMeta_alterMetaStore(t *testing.T) { assert.NoError(t, err) } -func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) { - prepareSegments := &SegmentsInfo{ - map[UniqueID]*SegmentInfo{ - 1: {SegmentInfo: &datapb.SegmentInfo{ - ID: 1, - CollectionID: 100, - PartitionID: 10, - State: commonpb.SegmentState_Flushed, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 1, 2)}, - NumOfRows: 1, - }}, - 2: {SegmentInfo: &datapb.SegmentInfo{ - ID: 2, - CollectionID: 100, - PartitionID: 10, - State: commonpb.SegmentState_Flushed, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 3, 4)}, - NumOfRows: 1, - }}, - }, - } - - m := &meta{ - catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, - segments: prepareSegments, - } - - plan := &datapb.CompactionPlan{ - SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ - { - SegmentID: 1, - FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 1, 2)}, - }, - { - SegmentID: 2, - FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 3, 4)}, - }, - }, - StartTime: 15, - } - - inSegment := &datapb.CompactionSegment{ - SegmentID: 3, - InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 5)}, - Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 5)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 5)}, - NumOfRows: 2, - } - inCompactionResult := &datapb.CompactionPlanResult{ - Segments: []*datapb.CompactionSegment{inSegment}, - } - afterCompact, newSegment, metricMutation, err := m.PrepareCompleteCompactionMutation(plan, inCompactionResult) - assert.NoError(t, err) - assert.NotNil(t, afterCompact) - assert.NotNil(t, newSegment) - assert.Equal(t, 2, len(metricMutation.stateChange[datapb.SegmentLevel_Legacy.String()])) - assert.Equal(t, 1, len(metricMutation.stateChange[datapb.SegmentLevel_L1.String()])) - assert.Equal(t, int64(0), metricMutation.rowCountChange) - assert.Equal(t, int64(2), metricMutation.rowCountAccChange) - - require.Equal(t, 2, len(afterCompact)) - assert.Equal(t, commonpb.SegmentState_Dropped, afterCompact[0].GetState()) - assert.Equal(t, commonpb.SegmentState_Dropped, afterCompact[1].GetState()) - assert.NotZero(t, afterCompact[0].GetDroppedAt()) - assert.NotZero(t, afterCompact[1].GetDroppedAt()) - - assert.Equal(t, inSegment.SegmentID, newSegment.GetID()) - assert.Equal(t, UniqueID(100), newSegment.GetCollectionID()) - assert.Equal(t, UniqueID(10), newSegment.GetPartitionID()) - assert.Equal(t, inSegment.NumOfRows, newSegment.GetNumOfRows()) - assert.Equal(t, commonpb.SegmentState_Flushed, newSegment.GetState()) - - assert.EqualValues(t, inSegment.GetInsertLogs(), newSegment.GetBinlogs()) - assert.EqualValues(t, inSegment.GetField2StatslogPaths(), newSegment.GetStatslogs()) - assert.EqualValues(t, inSegment.GetDeltalogs(), newSegment.GetDeltalogs()) - assert.NotZero(t, newSegment.lastFlushTime) - assert.Equal(t, uint64(15), newSegment.GetLastExpireTime()) -} - func Test_meta_SetSegmentCompacting(t *testing.T) { type fields struct { client kv.MetaKv