diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 5ceb36c3a7..d505d346f6 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -412,7 +412,7 @@ func (c *compactionPlanHandler) handleL0CompactionResult(plan *datapb.Compaction }) for _, seg := range levelZeroSegments { - operators = append(operators, UpdateStatusOperator(seg.SegmentID, commonpb.SegmentState_Dropped)) + operators = append(operators, UpdateStatusOperator(seg.GetSegmentID(), commonpb.SegmentState_Dropped), UpdateCompactedOperator(seg.GetSegmentID())) } log.Info("meta update: update segments info for level zero compaction", diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 009bf73e82..5058888015 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -128,9 +128,9 @@ func (s *CompactionPlanHandlerSuite) TestClean() { func (s *CompactionPlanHandlerSuite) TestHandleL0CompactionResults() { channel := "Ch-1" - s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Run(func(operators ...UpdateOperator) { - s.Equal(5, len(operators)) + s.Equal(7, len(operators)) }).Return(nil).Once() deltalogs := []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))} diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 7eb1e8ddff..68c422ee86 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -521,6 +521,19 @@ func UpdateStatusOperator(segmentID int64, status commonpb.SegmentState) UpdateO } } +func UpdateCompactedOperator(segmentID int64) UpdateOperator { + return func(modPack *updateSegmentPack) bool { + segment := modPack.Get(segmentID) + if segment == nil { + log.Warn("meta update: update binlog failed - segment not found", + zap.Int64("segmentID", segmentID)) + return false + } + segment.Compacted = true + return true + } +} + // update binlogs in segmentInfo func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs []*datapb.FieldBinlog) UpdateOperator { return func(modPack *updateSegmentPack) bool { @@ -1032,7 +1045,7 @@ func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan, PartitionID: modSegments[0].PartitionID, InsertChannel: modSegments[0].InsertChannel, NumOfRows: compactToSegment.NumOfRows, - State: commonpb.SegmentState_Flushing, + State: commonpb.SegmentState_Flushed, MaxRowNum: modSegments[0].MaxRowNum, Binlogs: compactToSegment.GetInsertLogs(), Statslogs: compactToSegment.GetField2StatslogPaths(), diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 7d8ab5bb45..a744416087 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -533,6 +533,30 @@ func TestUpdateSegmentsInfo(t *testing.T) { assert.Equal(t, updated.NumOfRows, expected.NumOfRows) }) + t.Run("update compacted segment", func(t *testing.T) { + meta, err := newMemoryMeta() + assert.NoError(t, err) + + // segment not found + err = meta.UpdateSegmentsInfo( + UpdateCompactedOperator(1), + ) + assert.NoError(t, err) + + // normal + segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ + ID: 1, State: commonpb.SegmentState_Flushed, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, getInsertLogPath("binlog0", 1))}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, getStatsLogPath("statslog0", 1))}, + }} + err = meta.AddSegment(context.TODO(), segment1) + assert.NoError(t, err) + + err = meta.UpdateSegmentsInfo( + UpdateCompactedOperator(1), + ) + assert.NoError(t, err) + }) t.Run("update non-existed segment", func(t *testing.T) { meta, err := newMemoryMeta() assert.NoError(t, err) @@ -745,7 +769,7 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) { assert.Equal(t, UniqueID(100), newSegment.GetCollectionID()) assert.Equal(t, UniqueID(10), newSegment.GetPartitionID()) assert.Equal(t, inSegment.NumOfRows, newSegment.GetNumOfRows()) - assert.Equal(t, commonpb.SegmentState_Flushing, newSegment.GetState()) + assert.Equal(t, commonpb.SegmentState_Flushed, newSegment.GetState()) assert.EqualValues(t, inSegment.GetInsertLogs(), newSegment.GetBinlogs()) assert.EqualValues(t, inSegment.GetField2StatslogPaths(), newSegment.GetStatslogs())