mirror of https://github.com/milvus-io/milvus.git
the executor always fetches the latest segment info, so we could consume from the latest checkpoint, which could save much time while deleted many entities pr: #29455 Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yah2er0ne@outlook.com>pull/29546/head
parent
151a5c3ca8
commit
4c0ca83928
|
@ -151,7 +151,12 @@ func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dis
|
|||
log.Warn("failed to get segment info from DataCoord", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
loadInfo := utils.PackSegmentLoadInfo(resp, nil)
|
||||
|
||||
channel := o.target.GetDmChannel(s.GetCollectionID(), s.GetInsertChannel(), meta.NextTarget)
|
||||
if channel == nil {
|
||||
channel = o.target.GetDmChannel(s.GetCollectionID(), s.GetInsertChannel(), meta.CurrentTarget)
|
||||
}
|
||||
loadInfo := utils.PackSegmentLoadInfo(resp, channel.GetSeekPosition(), nil)
|
||||
|
||||
log.Debug("leader observer append a segment to set",
|
||||
zap.Int64("collectionID", leaderView.CollectionID),
|
||||
|
|
|
@ -136,7 +136,7 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() {
|
|||
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
|
||||
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
|
||||
observer.dist.LeaderViewManager.Update(2, view)
|
||||
loadInfo := utils.PackSegmentLoadInfo(resp, nil)
|
||||
loadInfo := utils.PackSegmentLoadInfo(resp, nil, nil)
|
||||
|
||||
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
|
||||
return &querypb.SyncDistributionRequest{
|
||||
|
@ -232,7 +232,7 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() {
|
|||
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
|
||||
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
|
||||
observer.dist.LeaderViewManager.Update(2, view)
|
||||
loadInfo := utils.PackSegmentLoadInfo(resp, nil)
|
||||
loadInfo := utils.PackSegmentLoadInfo(resp, nil, nil)
|
||||
|
||||
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
|
||||
return &querypb.SyncDistributionRequest{
|
||||
|
@ -368,7 +368,7 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() {
|
|||
view2 := utils.CreateTestLeaderView(4, 1, "test-insert-channel", map[int64]int64{1: 4}, map[int64]*meta.Segment{})
|
||||
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
|
||||
observer.dist.LeaderViewManager.Update(4, view2)
|
||||
loadInfo := utils.PackSegmentLoadInfo(resp, nil)
|
||||
loadInfo := utils.PackSegmentLoadInfo(resp, nil, nil)
|
||||
|
||||
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
|
||||
return &querypb.SyncDistributionRequest{
|
||||
|
|
|
@ -191,7 +191,12 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
|
|||
indexes = nil
|
||||
}
|
||||
|
||||
loadInfo := utils.PackSegmentLoadInfo(resp, indexes)
|
||||
channel := ex.targetMgr.GetDmChannel(task.CollectionID(), segment.GetInsertChannel(), meta.NextTarget)
|
||||
if channel == nil {
|
||||
channel = ex.targetMgr.GetDmChannel(task.CollectionID(), segment.GetInsertChannel(), meta.CurrentTarget)
|
||||
}
|
||||
|
||||
loadInfo := utils.PackSegmentLoadInfo(resp, channel.GetSeekPosition(), indexes)
|
||||
|
||||
// Get collection index info
|
||||
indexInfo, err := ex.broker.DescribeIndex(ctx, task.CollectionID())
|
||||
|
|
|
@ -436,7 +436,7 @@ func (suite *TaskSuite) TestLoadSegmentTask() {
|
|||
err = suite.scheduler.Add(task)
|
||||
suite.NoError(err)
|
||||
}
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil)
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil)
|
||||
suite.target.UpdateCollectionNextTarget(suite.collection)
|
||||
segmentsNum := len(suite.loadSegments)
|
||||
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
|
||||
|
@ -532,7 +532,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() {
|
|||
err = suite.scheduler.Add(task)
|
||||
suite.NoError(err)
|
||||
}
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil)
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil)
|
||||
suite.target.UpdateCollectionNextTarget(suite.collection)
|
||||
segmentsNum := len(suite.loadSegments)
|
||||
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
|
||||
|
@ -622,7 +622,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() {
|
|||
err = suite.scheduler.Add(task)
|
||||
suite.NoError(err)
|
||||
}
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil)
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil)
|
||||
suite.target.UpdateCollectionNextTarget(suite.collection)
|
||||
segmentsNum := len(suite.loadSegments)
|
||||
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
|
||||
|
@ -998,7 +998,7 @@ func (suite *TaskSuite) TestTaskCanceled() {
|
|||
}
|
||||
segmentsNum := len(suite.loadSegments)
|
||||
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segmentInfos, nil)
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segmentInfos, nil)
|
||||
suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collection, partition))
|
||||
suite.target.UpdateCollectionNextTarget(suite.collection)
|
||||
|
||||
|
@ -1084,7 +1084,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
|
|||
err = suite.scheduler.Add(task)
|
||||
suite.NoError(err)
|
||||
}
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil)
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil)
|
||||
suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collection, partition))
|
||||
suite.target.UpdateCollectionNextTarget(suite.collection)
|
||||
segmentsNum := len(suite.loadSegments)
|
||||
|
@ -1120,7 +1120,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
|
|||
bakExpectations := suite.broker.ExpectedCalls
|
||||
suite.broker.AssertExpectations(suite.T())
|
||||
suite.broker.ExpectedCalls = suite.broker.ExpectedCalls[:0]
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil)
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil)
|
||||
|
||||
suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collection, 2))
|
||||
suite.target.UpdateCollectionNextTarget(suite.collection)
|
||||
|
@ -1326,7 +1326,7 @@ func (suite *TaskSuite) TestNoExecutor() {
|
|||
err = suite.scheduler.Add(task)
|
||||
suite.NoError(err)
|
||||
}
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil)
|
||||
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil)
|
||||
suite.target.UpdateCollectionNextTarget(suite.collection)
|
||||
segmentsNum := len(suite.loadSegments)
|
||||
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
|
||||
|
|
|
@ -72,34 +72,17 @@ func MergeMetaSegmentIntoSegmentInfo(info *querypb.SegmentInfo, segments ...*met
|
|||
|
||||
// packSegmentLoadInfo packs SegmentLoadInfo for given segment,
|
||||
// packs with index if withIndex is true, this fetch indexes from IndexCoord
|
||||
func PackSegmentLoadInfo(resp *datapb.GetSegmentInfoResponse, indexes []*querypb.FieldIndexInfo) *querypb.SegmentLoadInfo {
|
||||
var (
|
||||
deltaPosition *msgpb.MsgPosition
|
||||
positionSrc string
|
||||
)
|
||||
|
||||
func PackSegmentLoadInfo(resp *datapb.GetSegmentInfoResponse, checkpoint *msgpb.MsgPosition, indexes []*querypb.FieldIndexInfo) *querypb.SegmentLoadInfo {
|
||||
segment := resp.GetInfos()[0]
|
||||
|
||||
if resp.GetChannelCheckpoint() != nil && resp.ChannelCheckpoint[segment.InsertChannel] != nil {
|
||||
deltaPosition = resp.ChannelCheckpoint[segment.InsertChannel]
|
||||
positionSrc = "channelCheckpoint"
|
||||
} else if segment.GetDmlPosition() != nil {
|
||||
deltaPosition = segment.GetDmlPosition()
|
||||
positionSrc = "segmentDMLPos"
|
||||
} else {
|
||||
deltaPosition = segment.GetStartPosition()
|
||||
positionSrc = "segmentStartPos"
|
||||
}
|
||||
|
||||
posTime := tsoutil.PhysicalTime(deltaPosition.GetTimestamp())
|
||||
posTime := tsoutil.PhysicalTime(checkpoint.GetTimestamp())
|
||||
tsLag := time.Since(posTime)
|
||||
if tsLag >= 10*time.Minute {
|
||||
log.Warn("delta position is quite stale",
|
||||
zap.Int64("collectionID", segment.GetCollectionID()),
|
||||
zap.Int64("segmentID", segment.GetID()),
|
||||
zap.String("channel", segment.InsertChannel),
|
||||
zap.String("positionSource", positionSrc),
|
||||
zap.Uint64("posTs", deltaPosition.GetTimestamp()),
|
||||
zap.Uint64("posTs", checkpoint.GetTimestamp()),
|
||||
zap.Time("posTime", posTime),
|
||||
zap.Duration("tsLag", tsLag))
|
||||
}
|
||||
|
@ -114,7 +97,7 @@ func PackSegmentLoadInfo(resp *datapb.GetSegmentInfoResponse, indexes []*querypb
|
|||
InsertChannel: segment.InsertChannel,
|
||||
IndexInfos: indexes,
|
||||
StartPosition: segment.GetStartPosition(),
|
||||
DeltaPosition: deltaPosition,
|
||||
DeltaPosition: checkpoint,
|
||||
}
|
||||
loadInfo.SegmentSize = calculateSegmentSize(loadInfo)
|
||||
return loadInfo
|
||||
|
|
|
@ -36,6 +36,13 @@ func Test_packLoadSegmentRequest(t *testing.T) {
|
|||
t1 := tsoutil.ComposeTSByTime(time.Now().Add(-8*time.Minute), 0)
|
||||
t2 := tsoutil.ComposeTSByTime(time.Now().Add(-5*time.Minute), 0)
|
||||
|
||||
channel := &datapb.VchannelInfo{
|
||||
SeekPosition: &msgpb.MsgPosition{
|
||||
ChannelName: mockPChannel,
|
||||
Timestamp: t2,
|
||||
},
|
||||
}
|
||||
|
||||
segmentInfo := &datapb.SegmentInfo{
|
||||
ID: 0,
|
||||
InsertChannel: mockVChannel,
|
||||
|
@ -43,43 +50,27 @@ func Test_packLoadSegmentRequest(t *testing.T) {
|
|||
ChannelName: mockPChannel,
|
||||
Timestamp: t1,
|
||||
},
|
||||
DmlPosition: &msgpb.MsgPosition{
|
||||
ChannelName: mockPChannel,
|
||||
Timestamp: t2,
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("test set deltaPosition from segment dmlPosition", func(t *testing.T) {
|
||||
t.Run("test set deltaPosition from channel", func(t *testing.T) {
|
||||
resp := &datapb.GetSegmentInfoResponse{
|
||||
Infos: []*datapb.SegmentInfo{
|
||||
proto.Clone(segmentInfo).(*datapb.SegmentInfo),
|
||||
},
|
||||
}
|
||||
req := PackSegmentLoadInfo(resp, nil)
|
||||
req := PackSegmentLoadInfo(resp, channel.GetSeekPosition(), nil)
|
||||
assert.NotNil(t, req.GetDeltaPosition())
|
||||
assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName)
|
||||
assert.Equal(t, t2, req.GetDeltaPosition().Timestamp)
|
||||
})
|
||||
|
||||
t.Run("test set deltaPosition from segment startPosition", func(t *testing.T) {
|
||||
segInfo := proto.Clone(segmentInfo).(*datapb.SegmentInfo)
|
||||
segInfo.DmlPosition = nil
|
||||
resp := &datapb.GetSegmentInfoResponse{
|
||||
Infos: []*datapb.SegmentInfo{segInfo},
|
||||
}
|
||||
req := PackSegmentLoadInfo(resp, nil)
|
||||
assert.NotNil(t, req.GetDeltaPosition())
|
||||
assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName)
|
||||
assert.Equal(t, t1, req.GetDeltaPosition().Timestamp)
|
||||
})
|
||||
|
||||
t.Run("test tsLag > 10minutes", func(t *testing.T) {
|
||||
segInfo := proto.Clone(segmentInfo).(*datapb.SegmentInfo)
|
||||
segInfo.DmlPosition.Timestamp = t0
|
||||
channel := proto.Clone(channel).(*datapb.VchannelInfo)
|
||||
channel.SeekPosition.Timestamp = t0
|
||||
resp := &datapb.GetSegmentInfoResponse{
|
||||
Infos: []*datapb.SegmentInfo{segInfo},
|
||||
Infos: []*datapb.SegmentInfo{segmentInfo},
|
||||
}
|
||||
req := PackSegmentLoadInfo(resp, nil)
|
||||
req := PackSegmentLoadInfo(resp, channel.GetSeekPosition(), nil)
|
||||
assert.NotNil(t, req.GetDeltaPosition())
|
||||
assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName)
|
||||
assert.Equal(t, t0, req.GetDeltaPosition().Timestamp)
|
||||
|
|
Loading…
Reference in New Issue