Fix data loss after balance task rescheduled (#14096)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/14152/head
xige-16 2021-12-24 10:59:31 +08:00 committed by GitHub
parent 5d3eb75762
commit e3771bee3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 116 additions and 3 deletions

View File

@ -1061,8 +1061,11 @@ func (lst *loadSegmentTask) reschedule(ctx context.Context) ([]task, error) {
}
lst.excludeNodeIDs = append(lst.excludeNodeIDs, lst.DstNodeID)
//TODO:: wait or not according msgType
reScheduledTasks, err := assignInternalTask(ctx, lst.getParentTask(), lst.meta, lst.cluster, loadSegmentReqs, nil, false, lst.excludeNodeIDs, nil)
wait2AssignTaskSuccess := false
if lst.getParentTask().getTriggerCondition() == querypb.TriggerCondition_NodeDown {
wait2AssignTaskSuccess = true
}
reScheduledTasks, err := assignInternalTask(ctx, lst.getParentTask(), lst.meta, lst.cluster, loadSegmentReqs, nil, wait2AssignTaskSuccess, lst.excludeNodeIDs, nil)
if err != nil {
log.Error("loadSegment reschedule failed", zap.Int64s("excludeNodes", lst.excludeNodeIDs), zap.Int64("taskID", lst.getTaskID()), zap.Error(err))
return nil, err
@ -1236,7 +1239,11 @@ func (wdt *watchDmChannelTask) reschedule(ctx context.Context) ([]task, error) {
wdt.excludeNodeIDs = []int64{}
}
wdt.excludeNodeIDs = append(wdt.excludeNodeIDs, wdt.NodeID)
reScheduledTasks, err := assignInternalTask(ctx, wdt.parentTask, wdt.meta, wdt.cluster, nil, watchDmChannelReqs, false, wdt.excludeNodeIDs, nil)
wait2AssignTaskSuccess := false
if wdt.getParentTask().getTriggerCondition() == querypb.TriggerCondition_NodeDown {
wait2AssignTaskSuccess = true
}
reScheduledTasks, err := assignInternalTask(ctx, wdt.parentTask, wdt.meta, wdt.cluster, nil, watchDmChannelReqs, wait2AssignTaskSuccess, wdt.excludeNodeIDs, nil)
if err != nil {
log.Error("watchDmChannel reschedule failed", zap.Int64("taskID", wdt.getTaskID()), zap.Int64s("excludeNodes", wdt.excludeNodeIDs), zap.Error(err))
return nil, err

View File

@ -1072,6 +1072,112 @@ func TestLoadBalancePartitionAfterNodeDown(t *testing.T) {
assert.Nil(t, err)
}
func TestLoadBalanceAndReschedulSegmentTaskAfterNodeDown(t *testing.T) {
refreshParams()
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
node1, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node1.queryNodeID)
loadCollectionTask := genLoadCollectionTask(ctx, queryCoord)
err = queryCoord.scheduler.Enqueue(loadCollectionTask)
assert.Nil(t, err)
waitTaskFinalState(loadCollectionTask, taskExpired)
node2, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
node2.loadSegment = returnFailedResult
waitQueryNodeOnline(queryCoord.cluster, node2.queryNodeID)
removeNodeSession(node1.queryNodeID)
for {
_, activeTaskValues, err := queryCoord.scheduler.client.LoadWithPrefix(activeTaskPrefix)
assert.Nil(t, err)
if len(activeTaskValues) != 0 {
break
}
}
node3, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node3.queryNodeID)
segmentInfos := queryCoord.meta.getSegmentInfosByNode(node3.queryNodeID)
for _, segmentInfo := range segmentInfos {
if segmentInfo.NodeID == node3.queryNodeID {
break
}
}
for {
_, triggrtTaskValues, err := queryCoord.scheduler.client.LoadWithPrefix(triggerTaskPrefix)
assert.Nil(t, err)
if len(triggrtTaskValues) == 0 {
break
}
}
err = removeAllSession()
assert.Nil(t, err)
}
func TestLoadBalanceAndReschedulDmChannelTaskAfterNodeDown(t *testing.T) {
refreshParams()
ctx := context.Background()
queryCoord, err := startQueryCoord(ctx)
assert.Nil(t, err)
node1, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node1.queryNodeID)
loadCollectionTask := genLoadCollectionTask(ctx, queryCoord)
err = queryCoord.scheduler.Enqueue(loadCollectionTask)
assert.Nil(t, err)
waitTaskFinalState(loadCollectionTask, taskExpired)
node2, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
node2.watchDmChannels = returnFailedResult
waitQueryNodeOnline(queryCoord.cluster, node2.queryNodeID)
removeNodeSession(node1.queryNodeID)
for {
_, activeTaskValues, err := queryCoord.scheduler.client.LoadWithPrefix(activeTaskPrefix)
assert.Nil(t, err)
if len(activeTaskValues) != 0 {
break
}
}
node3, err := startQueryNodeServer(ctx)
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node3.queryNodeID)
dmChannelInfos := queryCoord.meta.getDmChannelInfosByNodeID(node3.queryNodeID)
for _, channelInfo := range dmChannelInfos {
if channelInfo.NodeIDLoaded == node3.queryNodeID {
break
}
}
for {
_, triggrtTaskValues, err := queryCoord.scheduler.client.LoadWithPrefix(triggerTaskPrefix)
assert.Nil(t, err)
if len(triggrtTaskValues) == 0 {
break
}
}
err = removeAllSession()
assert.Nil(t, err)
}
func TestMergeWatchDeltaChannelInfo(t *testing.T) {
infos := []*datapb.VchannelInfo{
{