mirror of https://github.com/milvus-io/milvus.git
Don't remove nodeInfo when querycoord panic at loadBalanceTask (#15189)
Signed-off-by: xige-16 <xi.ge@zilliz.com>pull/15181/head
parent
892763b823
commit
384d3169d3
|
@ -1676,6 +1676,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
|
|||
toRecoverPartitionIDs, err = showPartitions(ctx, collectionID, lbt.rootCoord)
|
||||
if err != nil {
|
||||
log.Error("loadBalanceTask: show collection's partitionIDs failed", zap.Int64("collectionID", collectionID), zap.Error(err))
|
||||
lbt.setResultInfo(err)
|
||||
panic(err)
|
||||
}
|
||||
} else {
|
||||
|
@ -1687,6 +1688,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
|
|||
vChannelInfos, binlogs, err := getRecoveryInfo(lbt.ctx, lbt.dataCoord, collectionID, partitionID)
|
||||
if err != nil {
|
||||
log.Error("loadBalanceTask: getRecoveryInfo failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err))
|
||||
lbt.setResultInfo(err)
|
||||
panic(err)
|
||||
}
|
||||
|
||||
|
@ -1728,6 +1730,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
|
|||
deltaChannel, err := generateWatchDeltaChannelInfo(info)
|
||||
if err != nil {
|
||||
log.Error("loadBalanceTask: generateWatchDeltaChannelInfo failed", zap.Int64("collectionID", collectionID), zap.String("channelName", info.ChannelName), zap.Error(err))
|
||||
lbt.setResultInfo(err)
|
||||
panic(err)
|
||||
}
|
||||
deltaChannelInfos = append(deltaChannelInfos, deltaChannel)
|
||||
|
@ -1740,6 +1743,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
|
|||
err = lbt.meta.setDeltaChannel(collectionID, mergedDeltaChannel)
|
||||
if err != nil {
|
||||
log.Error("loadBalanceTask: set delta channel info meta failed", zap.Int64("collectionID", collectionID), zap.Error(err))
|
||||
lbt.setResultInfo(err)
|
||||
panic(err)
|
||||
}
|
||||
|
||||
|
@ -1766,6 +1770,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
|
|||
internalTasks, err := assignInternalTask(ctx, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, watchDmChannelReqs, true, lbt.SourceNodeIDs, lbt.DstNodeIDs)
|
||||
if err != nil {
|
||||
log.Error("loadBalanceTask: assign child task failed", zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs))
|
||||
lbt.setResultInfo(err)
|
||||
panic(err)
|
||||
}
|
||||
for _, internalTask := range internalTasks {
|
||||
|
@ -1932,7 +1937,11 @@ func (lbt *loadBalanceTask) postExecute(context.Context) error {
|
|||
if lbt.getResultInfo().ErrorCode != commonpb.ErrorCode_Success {
|
||||
lbt.clearChildTasks()
|
||||
}
|
||||
if lbt.triggerCondition == querypb.TriggerCondition_NodeDown {
|
||||
|
||||
// if loadBalanceTask execute failed after query node down, the lbt.getResultInfo().ErrorCode will be set to commonpb.ErrorCode_UnexpectedError
|
||||
// then the queryCoord will panic, and the nodeInfo should not be removed immediately
|
||||
// after queryCoord recovery, the balanceTask will redo
|
||||
if lbt.triggerCondition == querypb.TriggerCondition_NodeDown && lbt.getResultInfo().ErrorCode == commonpb.ErrorCode_Success {
|
||||
for _, id := range lbt.SourceNodeIDs {
|
||||
err := lbt.cluster.removeNodeInfo(id)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue