Panic when recover querynode failed (#15161)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/15164/head
xige-16 2022-01-12 17:43:37 +08:00 committed by GitHub
parent 0c98f21d4d
commit 4e956ee10c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 39 deletions

View File

@ -342,29 +342,12 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error {
defer lct.reduceRetryCount()
collectionID := lct.CollectionID
showPartitionRequest := &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowPartitions,
},
CollectionID: collectionID,
}
ctx2, cancel2 := context.WithTimeout(ctx, timeoutForRPC)
defer cancel2()
showPartitionResponse, err := lct.rootCoord.ShowPartitions(ctx2, showPartitionRequest)
toLoadPartitionIDs, err := showPartitions(ctx, collectionID, lct.rootCoord)
if err != nil {
log.Error("loadCollectionTask: showPartition failed", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lct.Base.MsgID), zap.Error(err))
lct.setResultInfo(err)
return err
}
if showPartitionResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
err = errors.New(showPartitionResponse.Status.Reason)
log.Error("loadCollectionTask: showPartition failed", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lct.Base.MsgID), zap.Error(err))
lct.setResultInfo(err)
return err
}
toLoadPartitionIDs := showPartitionResponse.PartitionIDs
log.Debug("loadCollectionTask: get collection's all partitionIDs", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", toLoadPartitionIDs), zap.Int64("msgID", lct.Base.MsgID))
loadSegmentReqs := make([]*querypb.LoadSegmentsRequest, 0)
@ -1690,20 +1673,11 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
var toRecoverPartitionIDs []UniqueID
if collectionInfo.LoadType == querypb.LoadType_loadCollection {
showPartitionRequest := &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowPartitions,
},
CollectionID: collectionID,
}
showPartitionResponse, err := lbt.rootCoord.ShowPartitions(ctx, showPartitionRequest)
toRecoverPartitionIDs, err = showPartitions(ctx, collectionID, lbt.rootCoord)
if err != nil {
lbt.setResultInfo(err)
return err
log.Error("loadBalanceTask: show collection's partitionIDs failed", zap.Int64("collectionID", collectionID), zap.Error(err))
panic(err)
}
//TODO:: queryNode receive dm message according partitionID cache
//TODO:: queryNode add partitionID to cache if receive create partition message from dmChannel
toRecoverPartitionIDs = showPartitionResponse.PartitionIDs
} else {
toRecoverPartitionIDs = collectionInfo.PartitionIDs
}
@ -1713,8 +1687,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
vChannelInfos, binlogs, err := getRecoveryInfo(lbt.ctx, lbt.dataCoord, collectionID, partitionID)
if err != nil {
log.Error("loadBalanceTask: getRecoveryInfo failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err))
lbt.setResultInfo(err)
return err
panic(err)
}
for _, segmentBingLog := range binlogs {
@ -1755,8 +1728,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
deltaChannel, err := generateWatchDeltaChannelInfo(info)
if err != nil {
log.Error("loadBalanceTask: generateWatchDeltaChannelInfo failed", zap.Int64("collectionID", collectionID), zap.String("channelName", info.ChannelName), zap.Error(err))
lbt.setResultInfo(err)
return err
panic(err)
}
deltaChannelInfos = append(deltaChannelInfos, deltaChannel)
dmChannelInfos = append(dmChannelInfos, info)
@ -1768,8 +1740,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
err = lbt.meta.setDeltaChannel(collectionID, mergedDeltaChannel)
if err != nil {
log.Error("loadBalanceTask: set delta channel info meta failed", zap.Int64("collectionID", collectionID), zap.Error(err))
lbt.setResultInfo(err)
return err
panic(err)
}
mergedDmChannel := mergeDmChannelInfo(dmChannelInfos)
@ -1794,9 +1765,8 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
}
internalTasks, err := assignInternalTask(ctx, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, watchDmChannelReqs, true, lbt.SourceNodeIDs, lbt.DstNodeIDs)
if err != nil {
log.Warn("loadBalanceTask: assign child task failed", zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs))
lbt.setResultInfo(err)
return err
log.Error("loadBalanceTask: assign child task failed", zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs))
panic(err)
}
for _, internalTask := range internalTasks {
lbt.addChildTask(internalTask)
@ -2125,6 +2095,28 @@ func getRecoveryInfo(ctx context.Context, dataCoord types.DataCoord, collectionI
return recoveryInfo.Channels, recoveryInfo.Binlogs, nil
}
func showPartitions(ctx context.Context, collectionID UniqueID, rootCoord types.RootCoord) ([]UniqueID, error) {
ctx2, cancel2 := context.WithTimeout(ctx, timeoutForRPC)
defer cancel2()
showPartitionRequest := &milvuspb.ShowPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ShowPartitions,
},
CollectionID: collectionID,
}
showPartitionResponse, err := rootCoord.ShowPartitions(ctx2, showPartitionRequest)
if err != nil {
return nil, err
}
if showPartitionResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
err = errors.New(showPartitionResponse.Status.Reason)
return nil, err
}
return showPartitionResponse.PartitionIDs, nil
}
func mergeDmChannelInfo(infos []*datapb.VchannelInfo) map[string]*datapb.VchannelInfo {
minPositions := make(map[string]*datapb.VchannelInfo)
for _, info := range infos {

View File

@ -1299,3 +1299,15 @@ func TestUpdateTaskProcessWhenWatchDmChannel(t *testing.T) {
err = removeAllSession()
assert.Nil(t, err)
}
func TestShowPartitions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
rootCoord := newRootCoordMock()
rootCoord.createCollection(defaultCollectionID)
rootCoord.createPartition(defaultCollectionID, defaultPartitionID)
partitionIDs, err := showPartitions(ctx, defaultCollectionID, rootCoord)
assert.Nil(t, err)
assert.Equal(t, 2, len(partitionIDs))
cancel()
}