diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index ed8c5ac925..38888d46b9 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -365,6 +365,8 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { break } + group.segments = FilterInIndexedSegments(t.handler, t.indexCoord, group.segments...) + isDiskIndex, err := t.updateSegmentMaxSize(group.segments) if err != nil { log.Warn("failed to update segment max size", zap.Error(err)) diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 2379f79324..b635fb02ab 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -438,7 +438,7 @@ func Test_compactionTrigger_force(t *testing.T) { case <-time.After(2 * time.Second): hasPlan = false } - assert.Equal(t, true, hasPlan) + assert.Equal(t, false, hasPlan) }) t.Run(tt.name+" with meta error", func(t *testing.T) { @@ -1033,8 +1033,13 @@ func Test_compactionTrigger_noplan(t *testing.T) { err := tr.triggerCompaction() assert.Equal(t, tt.wantErr, err != nil) spy := (tt.fields.compactionHandler).(*spyCompactionHandler) - plan := <-spy.spyChan - assert.Equal(t, len(plan.SegmentBinlogs), 4) + select { + case val := <-spy.spyChan: + assert.Fail(t, "we expect no compaction generated", val) + return + case <-time.After(3 * time.Second): + return + } }) } } diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 9b384b82d3..8d6bf0787d 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -137,24 +137,14 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni unIndexedIDs.Insert(s.GetID()) } } - hasUnIndexed := true - for hasUnIndexed { - hasUnIndexed = false - for id := range unIndexedIDs { - // Indexed segments are compacted to a raw segment, - // replace it with the indexed ones - if len(segmentInfos[id].GetCompactionFrom()) > 0 { - unIndexedIDs.Remove(id) - for _, segID := range segmentInfos[id].GetCompactionFrom() { - if indexed.Contain(segID) { - indexedIDs.Insert(segID) - } else { - unIndexedIDs.Insert(segID) - hasUnIndexed = true - } - } - droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...) - } + for id := range unIndexedIDs { + // Indexed segments are compacted to a raw segment, + // replace it with the indexed ones + if len(segmentInfos[id].GetCompactionFrom()) > 0 && + indexed.Contain(segmentInfos[id].GetCompactionFrom()...) { + unIndexedIDs.Remove(id) + indexedIDs.Insert(segmentInfos[id].GetCompactionFrom()...) + droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...) } } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index a8bd4eb2bc..46d287c91c 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2644,75 +2644,6 @@ func TestGetRecoveryInfo(t *testing.T) { assert.Equal(t, UniqueID(8), resp.GetChannels()[0].GetDroppedSegmentIds()[0]) }) - t.Run("with continuous compaction", func(t *testing.T) { - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoord, error) { - return newMockRootCoordService(), nil - } - - svr.meta.AddCollection(&collectionInfo{ - ID: 0, - Schema: newTestSchema(), - }) - - err := svr.meta.UpdateChannelCheckpoint("vchan1", &internalpb.MsgPosition{ - ChannelName: "vchan1", - Timestamp: 0, - }) - assert.NoError(t, err) - - seg1 := createSegment(9, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Dropped) - seg2 := createSegment(10, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) - seg3 := createSegment(11, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) - seg3.CompactionFrom = []int64{9, 10} - seg4 := createSegment(12, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped) - seg5 := createSegment(13, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Flushed) - seg5.CompactionFrom = []int64{11, 12} - err = svr.meta.AddSegment(NewSegmentInfo(seg1)) - assert.Nil(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg2)) - assert.Nil(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg3)) - assert.Nil(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg4)) - assert.Nil(t, err) - err = svr.meta.AddSegment(NewSegmentInfo(seg5)) - assert.Nil(t, err) - mockResp := &indexpb.GetIndexInfoResponse{ - Status: &commonpb.Status{}, - SegmentInfo: map[int64]*indexpb.SegmentInfo{ - seg4.ID: { - CollectionID: seg4.CollectionID, - SegmentID: seg4.ID, - EnableIndex: true, - IndexInfos: []*indexpb.IndexFilePathInfo{ - { - SegmentID: seg4.ID, - FieldID: 2, - }, - }, - }, - }, - } - svr.indexCoord = mocks.NewMockIndexCoord(t) - svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(mockResp, nil) - - req := &datapb.GetRecoveryInfoRequest{ - CollectionID: 0, - PartitionID: 0, - } - resp, err := svr.GetRecoveryInfo(context.TODO(), req) - assert.Nil(t, err) - assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) - assert.NotNil(t, resp.GetChannels()[0].SeekPosition) - assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) - assert.Len(t, resp.GetChannels()[0].GetDroppedSegmentIds(), 0) - assert.ElementsMatch(t, []UniqueID{9, 10}, resp.GetChannels()[0].GetUnflushedSegmentIds()) - assert.ElementsMatch(t, []UniqueID{12}, resp.GetChannels()[0].GetFlushedSegmentIds()) - }) - t.Run("with closed server", func(t *testing.T) { svr := newTestServer(t, nil) closeTestServer(t, svr)