mirror of https://github.com/milvus-io/milvus.git
parent
d63bf18236
commit
6313a45524
|
@ -365,6 +365,8 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
|
|||
break
|
||||
}
|
||||
|
||||
group.segments = FilterInIndexedSegments(t.handler, t.indexCoord, group.segments...)
|
||||
|
||||
isDiskIndex, err := t.updateSegmentMaxSize(group.segments)
|
||||
if err != nil {
|
||||
log.Warn("failed to update segment max size", zap.Error(err))
|
||||
|
|
|
@ -438,7 +438,7 @@ func Test_compactionTrigger_force(t *testing.T) {
|
|||
case <-time.After(2 * time.Second):
|
||||
hasPlan = false
|
||||
}
|
||||
assert.Equal(t, true, hasPlan)
|
||||
assert.Equal(t, false, hasPlan)
|
||||
})
|
||||
|
||||
t.Run(tt.name+" with meta error", func(t *testing.T) {
|
||||
|
@ -1033,8 +1033,13 @@ func Test_compactionTrigger_noplan(t *testing.T) {
|
|||
err := tr.triggerCompaction()
|
||||
assert.Equal(t, tt.wantErr, err != nil)
|
||||
spy := (tt.fields.compactionHandler).(*spyCompactionHandler)
|
||||
plan := <-spy.spyChan
|
||||
assert.Equal(t, len(plan.SegmentBinlogs), 4)
|
||||
select {
|
||||
case val := <-spy.spyChan:
|
||||
assert.Fail(t, "we expect no compaction generated", val)
|
||||
return
|
||||
case <-time.After(3 * time.Second):
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -137,24 +137,14 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni
|
|||
unIndexedIDs.Insert(s.GetID())
|
||||
}
|
||||
}
|
||||
hasUnIndexed := true
|
||||
for hasUnIndexed {
|
||||
hasUnIndexed = false
|
||||
for id := range unIndexedIDs {
|
||||
// Indexed segments are compacted to a raw segment,
|
||||
// replace it with the indexed ones
|
||||
if len(segmentInfos[id].GetCompactionFrom()) > 0 {
|
||||
unIndexedIDs.Remove(id)
|
||||
for _, segID := range segmentInfos[id].GetCompactionFrom() {
|
||||
if indexed.Contain(segID) {
|
||||
indexedIDs.Insert(segID)
|
||||
} else {
|
||||
unIndexedIDs.Insert(segID)
|
||||
hasUnIndexed = true
|
||||
}
|
||||
}
|
||||
droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...)
|
||||
}
|
||||
for id := range unIndexedIDs {
|
||||
// Indexed segments are compacted to a raw segment,
|
||||
// replace it with the indexed ones
|
||||
if len(segmentInfos[id].GetCompactionFrom()) > 0 &&
|
||||
indexed.Contain(segmentInfos[id].GetCompactionFrom()...) {
|
||||
unIndexedIDs.Remove(id)
|
||||
indexedIDs.Insert(segmentInfos[id].GetCompactionFrom()...)
|
||||
droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2644,75 +2644,6 @@ func TestGetRecoveryInfo(t *testing.T) {
|
|||
assert.Equal(t, UniqueID(8), resp.GetChannels()[0].GetDroppedSegmentIds()[0])
|
||||
})
|
||||
|
||||
t.Run("with continuous compaction", func(t *testing.T) {
|
||||
svr := newTestServer(t, nil)
|
||||
defer closeTestServer(t, svr)
|
||||
|
||||
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoord, error) {
|
||||
return newMockRootCoordService(), nil
|
||||
}
|
||||
|
||||
svr.meta.AddCollection(&collectionInfo{
|
||||
ID: 0,
|
||||
Schema: newTestSchema(),
|
||||
})
|
||||
|
||||
err := svr.meta.UpdateChannelCheckpoint("vchan1", &internalpb.MsgPosition{
|
||||
ChannelName: "vchan1",
|
||||
Timestamp: 0,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
seg1 := createSegment(9, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Dropped)
|
||||
seg2 := createSegment(10, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped)
|
||||
seg3 := createSegment(11, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped)
|
||||
seg3.CompactionFrom = []int64{9, 10}
|
||||
seg4 := createSegment(12, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped)
|
||||
seg5 := createSegment(13, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Flushed)
|
||||
seg5.CompactionFrom = []int64{11, 12}
|
||||
err = svr.meta.AddSegment(NewSegmentInfo(seg1))
|
||||
assert.Nil(t, err)
|
||||
err = svr.meta.AddSegment(NewSegmentInfo(seg2))
|
||||
assert.Nil(t, err)
|
||||
err = svr.meta.AddSegment(NewSegmentInfo(seg3))
|
||||
assert.Nil(t, err)
|
||||
err = svr.meta.AddSegment(NewSegmentInfo(seg4))
|
||||
assert.Nil(t, err)
|
||||
err = svr.meta.AddSegment(NewSegmentInfo(seg5))
|
||||
assert.Nil(t, err)
|
||||
mockResp := &indexpb.GetIndexInfoResponse{
|
||||
Status: &commonpb.Status{},
|
||||
SegmentInfo: map[int64]*indexpb.SegmentInfo{
|
||||
seg4.ID: {
|
||||
CollectionID: seg4.CollectionID,
|
||||
SegmentID: seg4.ID,
|
||||
EnableIndex: true,
|
||||
IndexInfos: []*indexpb.IndexFilePathInfo{
|
||||
{
|
||||
SegmentID: seg4.ID,
|
||||
FieldID: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
svr.indexCoord = mocks.NewMockIndexCoord(t)
|
||||
svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(mockResp, nil)
|
||||
|
||||
req := &datapb.GetRecoveryInfoRequest{
|
||||
CollectionID: 0,
|
||||
PartitionID: 0,
|
||||
}
|
||||
resp, err := svr.GetRecoveryInfo(context.TODO(), req)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
assert.NotNil(t, resp.GetChannels()[0].SeekPosition)
|
||||
assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp())
|
||||
assert.Len(t, resp.GetChannels()[0].GetDroppedSegmentIds(), 0)
|
||||
assert.ElementsMatch(t, []UniqueID{9, 10}, resp.GetChannels()[0].GetUnflushedSegmentIds())
|
||||
assert.ElementsMatch(t, []UniqueID{12}, resp.GetChannels()[0].GetFlushedSegmentIds())
|
||||
})
|
||||
|
||||
t.Run("with closed server", func(t *testing.T) {
|
||||
svr := newTestServer(t, nil)
|
||||
closeTestServer(t, svr)
|
||||
|
|
Loading…
Reference in New Issue