diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index b1af7634e4..c4864bc39b 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -774,11 +774,13 @@ func isExpandableSmallSegment(segment *SegmentInfo, expectedSize int64) bool { func isDeltalogTooManySegment(segment *SegmentInfo) bool { deltaLogCount := GetBinlogCount(segment.GetDeltalogs()) - log.Debug("isDeltalogTooManySegment", + res := deltaLogCount > Params.DataCoordCfg.SingleCompactionDeltalogMaxNum.GetAsInt() + log.Debug("check whether segment has too many delta log", zap.Int64("collectionID", segment.CollectionID), zap.Int64("segmentID", segment.ID), - zap.Int("deltaLogCount", deltaLogCount)) - return deltaLogCount > Params.DataCoordCfg.SingleCompactionDeltalogMaxNum.GetAsInt() + zap.Int("deltaLogCount", deltaLogCount), + zap.Bool("result", res)) + return res } func isDeleteRowsTooManySegment(segment *SegmentInfo) bool { @@ -792,16 +794,15 @@ func isDeleteRowsTooManySegment(segment *SegmentInfo) bool { } // currently delta log size and delete ratio policy is applied - is := float64(totalDeletedRows)/float64(segment.GetNumOfRows()) >= Params.DataCoordCfg.SingleCompactionRatioThreshold.GetAsFloat() || + res := float64(totalDeletedRows)/float64(segment.GetNumOfRows()) >= Params.DataCoordCfg.SingleCompactionRatioThreshold.GetAsFloat() || totalDeleteLogSize > Params.DataCoordCfg.SingleCompactionDeltaLogMaxSize.GetAsInt64() - if is { - log.Info("total delete entities is too much", - zap.Int64("segmentID", segment.ID), - zap.Int64("numRows", segment.GetNumOfRows()), - zap.Int("deleted rows", totalDeletedRows), - zap.Int64("delete log size", totalDeleteLogSize)) - } - return is + log.Debug("check whether segment has too many delete data", + zap.Int64("segmentID", segment.ID), + zap.Int64("numRows", segment.GetNumOfRows()), + zap.Int("deleted rows", totalDeletedRows), + zap.Int64("delete log size", totalDeleteLogSize), + zap.Bool("result", res)) + return res } func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compactTime *compactTime) bool { diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 31c7d713f0..53a8a12ff4 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -233,7 +233,7 @@ func (m *CompactionTriggerManager) notify(ctx context.Context, eventType Compact log.Debug("Start to trigger a single compaction by TriggerTypeSingle") outView, reason := view.Trigger() if outView != nil { - log.Info("Success to trigger a MixCompaction output view, try to submit", + log.Info("Success to trigger a L2SingleCompaction output view, try to submit", zap.String("reason", reason), zap.String("output view", outView.String())) m.SubmitSingleViewToScheduler(ctx, outView) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index f54df98b8e..7a3cc9c55a 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1490,6 +1490,9 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d cloned := segment.Clone() cloned.DroppedAt = uint64(time.Now().UnixNano()) cloned.Compacted = true + // erase level and partitionStats version, to solve issue: https://github.com/milvus-io/milvus/issues/35003 + cloned.PartitionStatsVersion = 0 + cloned.Level = datapb.SegmentLevel_L1 compactFromSegInfos = append(compactFromSegInfos, cloned) compactFromSegIDs = append(compactFromSegIDs, cloned.GetID()) diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index d78540f838..1a3d9ba894 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -277,6 +277,73 @@ func (suite *MetaBasicSuite) TestCompleteCompactionMutation() { suite.EqualValues(2, mutation.rowCountAccChange) } +// fix https://github.com/milvus-io/milvus/issues/35003 +func (suite *MetaBasicSuite) TestCompleteCompactionMutationForL2Single() { + latestSegments := NewSegmentsInfo() + for segID, segment := range map[UniqueID]*SegmentInfo{ + 1: {SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 100, + PartitionID: 10, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)}, + // latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)}, + NumOfRows: 2, + PartitionStatsVersion: int64(10001), + }}, + 2: {SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 100, + PartitionID: 10, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 11000)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 21000)}, + // latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 31000), getFieldBinlogIDs(0, 31001)}, + NumOfRows: 2, + PartitionStatsVersion: int64(10001), + }}, + } { + latestSegments.SetSegment(segID, segment) + } + + mockChMgr := mocks.NewChunkManager(suite.T()) + m := &meta{ + catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}, + segments: latestSegments, + chunkManager: mockChMgr, + } + + compactToSeg := &datapb.CompactionSegment{ + SegmentID: 3, + InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50000)}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50001)}, + NumOfRows: 2, + } + + result := &datapb.CompactionPlanResult{ + Segments: []*datapb.CompactionSegment{compactToSeg}, + } + task := &datapb.CompactionTask{ + InputSegments: []UniqueID{1, 2}, + Type: datapb.CompactionType_MixCompaction, + } + + infos, _, err := m.CompleteCompactionMutation(task, result) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 1, len(infos)) + seg1 := m.GetSegment(1) + seg2 := m.GetSegment(2) + assert.Equal(suite.T(), int64(0), seg1.GetPartitionStatsVersion()) + assert.Equal(suite.T(), int64(0), seg2.GetPartitionStatsVersion()) + assert.Equal(suite.T(), datapb.SegmentLevel_L1, seg1.GetLevel()) + assert.Equal(suite.T(), datapb.SegmentLevel_L1, seg2.GetLevel()) +} + func (suite *MetaBasicSuite) TestSetSegment() { meta := suite.meta catalog := mocks2.NewDataCoordCatalog(suite.T()) diff --git a/tests/integration/compaction/l2_single_compaction_test.go b/tests/integration/compaction/l2_single_compaction_test.go index a32911e57b..21cbfe01bf 100644 --- a/tests/integration/compaction/l2_single_compaction_test.go +++ b/tests/integration/compaction/l2_single_compaction_test.go @@ -19,6 +19,7 @@ package compaction import ( "context" "fmt" + "sync" "testing" "time" @@ -49,11 +50,6 @@ func (s *L2SingleCompactionSuite) TestL2SingleCompaction() { defer cancel() c := s.Cluster - paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "1") - defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key) - paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "1") - defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key) - const ( dim = 128 dbName = "default" @@ -216,7 +212,10 @@ func (s *L2SingleCompactionSuite) TestL2SingleCompaction() { segments, err = c.MetaWatcher.ShowSegments() s.NoError(err) s.NotEmpty(segments) - log.Info("ShowSegments result", zap.Any("segments", segments)) + + for _, segment := range segments { + log.Info("ShowSegments result", zap.Int64("id", segment.ID), zap.String("state", segment.GetState().String()), zap.String("level", segment.GetLevel().String()), zap.Int64("numOfRows", segment.GetNumOfRows())) + } flushed := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { return segment.GetState() == commonpb.SegmentState_Flushed }) @@ -238,9 +237,48 @@ func (s *L2SingleCompactionSuite) TestL2SingleCompaction() { } } + checkQuerySegmentInfo := func() bool { + querySegmentInfo, err := c.Proxy.GetQuerySegmentInfo(ctx, &milvuspb.GetQuerySegmentInfoRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + return len(querySegmentInfo.GetInfos()) == 1 + } + + checkWaitGroup := sync.WaitGroup{} + checkWaitGroup.Add(1) + go func() { + defer checkWaitGroup.Done() + timeoutCtx, cancelFunc := context.WithTimeout(ctx, time.Minute*2) + defer cancelFunc() + + for { + select { + case <-timeoutCtx.Done(): + s.Fail("check query segment info timeout") + return + default: + if checkQuerySegmentInfo() { + return + } + } + time.Sleep(time.Second * 3) + } + }() + + checkWaitGroup.Wait() + log.Info("TestL2SingleCompaction succeed") } func TestL2SingleCompaction(t *testing.T) { + paramtable.Init() + // to speed up the test + paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "10") + paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "0") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key) + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key) + suite.Run(t, new(L2SingleCompactionSuite)) }