fix: may miss stream delta while loading (#28871)

we consume the delta data from the lastest channel checkpoint while
loading segment,

this works well without level 0 segments, but now it may lead to miss
some delta data,

so we have to consume from the current target's channel checkpoint

related: #27349

---------

Signed-off-by: yah01 <yah2er0ne@outlook.com>
pull/29006/head
yah01 2023-12-05 17:34:45 +08:00 committed by GitHub
parent bb82074937
commit fab52d167b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 43 additions and 74 deletions

View File

@ -151,7 +151,12 @@ func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dis
log.Warn("failed to get segment info from DataCoord", zap.Error(err))
continue
}
loadInfo := utils.PackSegmentLoadInfo(resp, nil)
channel := o.target.GetDmChannel(s.GetCollectionID(), s.GetInsertChannel(), meta.CurrentTarget)
if channel == nil {
channel = o.target.GetDmChannel(s.GetCollectionID(), s.GetInsertChannel(), meta.NextTarget)
}
loadInfo := utils.PackSegmentLoadInfo(resp.GetInfos()[0], channel.GetSeekPosition(), nil)
log.Debug("leader observer append a segment to set",
zap.Int64("collectionID", leaderView.CollectionID),

View File

@ -116,9 +116,6 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() {
PartitionID: 1,
InsertChannel: "test-insert-channel",
}
resp := &datapb.GetSegmentInfoResponse{
Infos: []*datapb.SegmentInfo{info},
}
schema := utils.CreateTestSchema()
suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{Schema: schema}, nil)
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(1)).Return(
@ -137,7 +134,7 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() {
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
loadInfo := utils.PackSegmentLoadInfo(resp, nil)
loadInfo := utils.PackSegmentLoadInfo(info, nil, nil)
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
@ -214,9 +211,6 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() {
PartitionID: 1,
InsertChannel: "test-insert-channel",
}
resp := &datapb.GetSegmentInfoResponse{
Infos: []*datapb.SegmentInfo{info},
}
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(1)).Return(
&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
@ -233,7 +227,7 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() {
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
loadInfo := utils.PackSegmentLoadInfo(resp, nil)
loadInfo := utils.PackSegmentLoadInfo(info, nil, nil)
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
@ -348,9 +342,6 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() {
PartitionID: 1,
InsertChannel: "test-insert-channel",
}
resp := &datapb.GetSegmentInfoResponse{
Infos: []*datapb.SegmentInfo{info},
}
schema := utils.CreateTestSchema()
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(1)).Return(
&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil)
@ -369,7 +360,7 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() {
view2 := utils.CreateTestLeaderView(4, 1, "test-insert-channel", map[int64]int64{1: 4}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(4, view2)
loadInfo := utils.PackSegmentLoadInfo(resp, nil)
loadInfo := utils.PackSegmentLoadInfo(info, nil, nil)
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{

View File

@ -274,10 +274,14 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
indexes = nil
}
loadInfo := utils.PackSegmentLoadInfo(resp, indexes)
channel := ex.targetMgr.GetDmChannel(task.CollectionID(), segment.GetInsertChannel(), meta.CurrentTarget)
if channel == nil {
channel = ex.targetMgr.GetDmChannel(task.CollectionID(), segment.GetInsertChannel(), meta.NextTarget)
}
loadInfo := utils.PackSegmentLoadInfo(resp.GetInfos()[0], channel.GetSeekPosition(), indexes)
// Get shard leader for the given replica and segment
leader, ok := getShardLeader(
// Get shard leaderID for the given replica and segment
leaderID, ok := getShardLeader(
ex.meta.ReplicaManager,
ex.dist,
task.CollectionID(),
@ -290,7 +294,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
log.Warn(msg, zap.Error(err))
return err
}
log = log.With(zap.Int64("shardLeader", leader))
log = log.With(zap.Int64("shardLeader", leaderID))
// Get collection index info
indexInfo, err := ex.broker.DescribeIndex(ctx, task.CollectionID())

View File

@ -441,7 +441,7 @@ func (suite *TaskSuite) TestLoadSegmentTask() {
err = suite.scheduler.Add(task)
suite.NoError(err)
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil)
suite.target.UpdateCollectionNextTarget(suite.collection)
segmentsNum := len(suite.loadSegments)
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
@ -487,7 +487,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() {
// Expect
suite.broker.EXPECT().DescribeCollection(mock.Anything, suite.collection).Return(&milvuspb.DescribeCollectionResponse{
Schema: &schemapb.CollectionSchema{
Name: "TestLoadSegmentTask",
Name: "TestLoadSegmentTaskNotIndex",
Fields: []*schemapb.FieldSchema{
{FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector},
},
@ -539,7 +539,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() {
err = suite.scheduler.Add(task)
suite.NoError(err)
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil)
suite.target.UpdateCollectionNextTarget(suite.collection)
segmentsNum := len(suite.loadSegments)
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
@ -585,7 +585,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() {
// Expect
suite.broker.EXPECT().DescribeCollection(mock.Anything, suite.collection).Return(&milvuspb.DescribeCollectionResponse{
Schema: &schemapb.CollectionSchema{
Name: "TestLoadSegmentTask",
Name: "TestLoadSegmentTaskNotIndex",
Fields: []*schemapb.FieldSchema{
{FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector},
},
@ -631,7 +631,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() {
err = suite.scheduler.Add(task)
suite.NoError(err)
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil)
suite.target.UpdateCollectionNextTarget(suite.collection)
segmentsNum := len(suite.loadSegments)
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
@ -1011,7 +1011,7 @@ func (suite *TaskSuite) TestTaskCanceled() {
}
segmentsNum := len(suite.loadSegments)
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segmentInfos, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segmentInfos, nil)
suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collection, partition))
suite.target.UpdateCollectionNextTarget(suite.collection)
@ -1099,7 +1099,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
err = suite.scheduler.Add(task)
suite.NoError(err)
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil)
suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collection, partition))
suite.target.UpdateCollectionNextTarget(suite.collection)
segmentsNum := len(suite.loadSegments)
@ -1135,7 +1135,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
bakExpectations := suite.broker.ExpectedCalls
suite.broker.AssertExpectations(suite.T())
suite.broker.ExpectedCalls = suite.broker.ExpectedCalls[:0]
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil)
suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collection, 2))
suite.target.UpdateCollectionNextTarget(suite.collection)
@ -1341,7 +1341,7 @@ func (suite *TaskSuite) TestNoExecutor() {
err = suite.scheduler.Add(task)
suite.NoError(err)
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil)
suite.target.UpdateCollectionNextTarget(suite.collection)
segmentsNum := len(suite.loadSegments)
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)

View File

@ -72,34 +72,15 @@ func MergeMetaSegmentIntoSegmentInfo(info *querypb.SegmentInfo, segments ...*met
// packSegmentLoadInfo packs SegmentLoadInfo for given segment,
// packs with index if withIndex is true, this fetch indexes from IndexCoord
func PackSegmentLoadInfo(resp *datapb.GetSegmentInfoResponse, indexes []*querypb.FieldIndexInfo) *querypb.SegmentLoadInfo {
var (
deltaPosition *msgpb.MsgPosition
positionSrc string
)
segment := resp.GetInfos()[0]
if resp.GetChannelCheckpoint() != nil && resp.ChannelCheckpoint[segment.InsertChannel] != nil {
deltaPosition = resp.ChannelCheckpoint[segment.InsertChannel]
positionSrc = "channelCheckpoint"
} else if segment.GetDmlPosition() != nil {
deltaPosition = segment.GetDmlPosition()
positionSrc = "segmentDMLPos"
} else {
deltaPosition = segment.GetStartPosition()
positionSrc = "segmentStartPos"
}
posTime := tsoutil.PhysicalTime(deltaPosition.GetTimestamp())
func PackSegmentLoadInfo(segment *datapb.SegmentInfo, channelCheckpoint *msgpb.MsgPosition, indexes []*querypb.FieldIndexInfo) *querypb.SegmentLoadInfo {
posTime := tsoutil.PhysicalTime(channelCheckpoint.GetTimestamp())
tsLag := time.Since(posTime)
if tsLag >= 10*time.Minute {
log.Warn("delta position is quite stale",
zap.Int64("collectionID", segment.GetCollectionID()),
zap.Int64("segmentID", segment.GetID()),
zap.String("channel", segment.InsertChannel),
zap.String("positionSource", positionSrc),
zap.Uint64("posTs", deltaPosition.GetTimestamp()),
zap.Uint64("posTs", channelCheckpoint.GetTimestamp()),
zap.Time("posTime", posTime),
zap.Duration("tsLag", tsLag))
}
@ -114,7 +95,7 @@ func PackSegmentLoadInfo(resp *datapb.GetSegmentInfoResponse, indexes []*querypb
InsertChannel: segment.InsertChannel,
IndexInfos: indexes,
StartPosition: segment.GetStartPosition(),
DeltaPosition: deltaPosition,
DeltaPosition: channelCheckpoint,
Level: segment.GetLevel(),
}
loadInfo.SegmentSize = calculateSegmentSize(loadInfo)

View File

@ -49,37 +49,25 @@ func Test_packLoadSegmentRequest(t *testing.T) {
},
}
t.Run("test set deltaPosition from segment dmlPosition", func(t *testing.T) {
resp := &datapb.GetSegmentInfoResponse{
Infos: []*datapb.SegmentInfo{
proto.Clone(segmentInfo).(*datapb.SegmentInfo),
},
}
req := PackSegmentLoadInfo(resp, nil)
channel := &datapb.VchannelInfo{
ChannelName: mockVChannel,
SeekPosition: &msgpb.MsgPosition{
ChannelName: mockPChannel,
Timestamp: t2,
},
}
t.Run("test set deltaPosition from channel seek position", func(t *testing.T) {
req := PackSegmentLoadInfo(segmentInfo, channel.GetSeekPosition(), nil)
assert.NotNil(t, req.GetDeltaPosition())
assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName)
assert.Equal(t, t2, req.GetDeltaPosition().Timestamp)
})
t.Run("test set deltaPosition from segment startPosition", func(t *testing.T) {
segInfo := proto.Clone(segmentInfo).(*datapb.SegmentInfo)
segInfo.DmlPosition = nil
resp := &datapb.GetSegmentInfoResponse{
Infos: []*datapb.SegmentInfo{segInfo},
}
req := PackSegmentLoadInfo(resp, nil)
assert.NotNil(t, req.GetDeltaPosition())
assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName)
assert.Equal(t, t1, req.GetDeltaPosition().Timestamp)
})
t.Run("test tsLag > 10minutes", func(t *testing.T) {
segInfo := proto.Clone(segmentInfo).(*datapb.SegmentInfo)
segInfo.DmlPosition.Timestamp = t0
resp := &datapb.GetSegmentInfoResponse{
Infos: []*datapb.SegmentInfo{segInfo},
}
req := PackSegmentLoadInfo(resp, nil)
channel := proto.Clone(channel).(*datapb.VchannelInfo)
channel.SeekPosition.Timestamp = t0
req := PackSegmentLoadInfo(segmentInfo, channel.GetSeekPosition(), nil)
assert.NotNil(t, req.GetDeltaPosition())
assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName)
assert.Equal(t, t0, req.GetDeltaPosition().Timestamp)