From 4e956ee10c9725c1ec85e88aadda170fb0edbc18 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Wed, 12 Jan 2022 17:43:37 +0800 Subject: [PATCH] Panic when recover querynode failed (#15161) Signed-off-by: xige-16 --- internal/querycoord/task.go | 70 ++++++++++++++------------------ internal/querycoord/task_test.go | 12 ++++++ 2 files changed, 43 insertions(+), 39 deletions(-) diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 4758ec4874..2b16baea69 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -342,29 +342,12 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error { defer lct.reduceRetryCount() collectionID := lct.CollectionID - showPartitionRequest := &milvuspb.ShowPartitionsRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_ShowPartitions, - }, - CollectionID: collectionID, - } - ctx2, cancel2 := context.WithTimeout(ctx, timeoutForRPC) - defer cancel2() - showPartitionResponse, err := lct.rootCoord.ShowPartitions(ctx2, showPartitionRequest) + toLoadPartitionIDs, err := showPartitions(ctx, collectionID, lct.rootCoord) if err != nil { log.Error("loadCollectionTask: showPartition failed", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lct.Base.MsgID), zap.Error(err)) lct.setResultInfo(err) return err } - - if showPartitionResponse.Status.ErrorCode != commonpb.ErrorCode_Success { - err = errors.New(showPartitionResponse.Status.Reason) - log.Error("loadCollectionTask: showPartition failed", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lct.Base.MsgID), zap.Error(err)) - lct.setResultInfo(err) - return err - } - - toLoadPartitionIDs := showPartitionResponse.PartitionIDs log.Debug("loadCollectionTask: get collection's all partitionIDs", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", toLoadPartitionIDs), zap.Int64("msgID", lct.Base.MsgID)) loadSegmentReqs := make([]*querypb.LoadSegmentsRequest, 0) @@ -1690,20 +1673,11 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { var toRecoverPartitionIDs []UniqueID if collectionInfo.LoadType == querypb.LoadType_loadCollection { - showPartitionRequest := &milvuspb.ShowPartitionsRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_ShowPartitions, - }, - CollectionID: collectionID, - } - showPartitionResponse, err := lbt.rootCoord.ShowPartitions(ctx, showPartitionRequest) + toRecoverPartitionIDs, err = showPartitions(ctx, collectionID, lbt.rootCoord) if err != nil { - lbt.setResultInfo(err) - return err + log.Error("loadBalanceTask: show collection's partitionIDs failed", zap.Int64("collectionID", collectionID), zap.Error(err)) + panic(err) } - //TODO:: queryNode receive dm message according partitionID cache - //TODO:: queryNode add partitionID to cache if receive create partition message from dmChannel - toRecoverPartitionIDs = showPartitionResponse.PartitionIDs } else { toRecoverPartitionIDs = collectionInfo.PartitionIDs } @@ -1713,8 +1687,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { vChannelInfos, binlogs, err := getRecoveryInfo(lbt.ctx, lbt.dataCoord, collectionID, partitionID) if err != nil { log.Error("loadBalanceTask: getRecoveryInfo failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err)) - lbt.setResultInfo(err) - return err + panic(err) } for _, segmentBingLog := range binlogs { @@ -1755,8 +1728,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { deltaChannel, err := generateWatchDeltaChannelInfo(info) if err != nil { log.Error("loadBalanceTask: generateWatchDeltaChannelInfo failed", zap.Int64("collectionID", collectionID), zap.String("channelName", info.ChannelName), zap.Error(err)) - lbt.setResultInfo(err) - return err + panic(err) } deltaChannelInfos = append(deltaChannelInfos, deltaChannel) dmChannelInfos = append(dmChannelInfos, info) @@ -1768,8 +1740,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { err = lbt.meta.setDeltaChannel(collectionID, mergedDeltaChannel) if err != nil { log.Error("loadBalanceTask: set delta channel info meta failed", zap.Int64("collectionID", collectionID), zap.Error(err)) - lbt.setResultInfo(err) - return err + panic(err) } mergedDmChannel := mergeDmChannelInfo(dmChannelInfos) @@ -1794,9 +1765,8 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { } internalTasks, err := assignInternalTask(ctx, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, watchDmChannelReqs, true, lbt.SourceNodeIDs, lbt.DstNodeIDs) if err != nil { - log.Warn("loadBalanceTask: assign child task failed", zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs)) - lbt.setResultInfo(err) - return err + log.Error("loadBalanceTask: assign child task failed", zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs)) + panic(err) } for _, internalTask := range internalTasks { lbt.addChildTask(internalTask) @@ -2125,6 +2095,28 @@ func getRecoveryInfo(ctx context.Context, dataCoord types.DataCoord, collectionI return recoveryInfo.Channels, recoveryInfo.Binlogs, nil } +func showPartitions(ctx context.Context, collectionID UniqueID, rootCoord types.RootCoord) ([]UniqueID, error) { + ctx2, cancel2 := context.WithTimeout(ctx, timeoutForRPC) + defer cancel2() + showPartitionRequest := &milvuspb.ShowPartitionsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_ShowPartitions, + }, + CollectionID: collectionID, + } + showPartitionResponse, err := rootCoord.ShowPartitions(ctx2, showPartitionRequest) + if err != nil { + return nil, err + } + + if showPartitionResponse.Status.ErrorCode != commonpb.ErrorCode_Success { + err = errors.New(showPartitionResponse.Status.Reason) + return nil, err + } + + return showPartitionResponse.PartitionIDs, nil +} + func mergeDmChannelInfo(infos []*datapb.VchannelInfo) map[string]*datapb.VchannelInfo { minPositions := make(map[string]*datapb.VchannelInfo) for _, info := range infos { diff --git a/internal/querycoord/task_test.go b/internal/querycoord/task_test.go index fb787db4c7..e0646e5524 100644 --- a/internal/querycoord/task_test.go +++ b/internal/querycoord/task_test.go @@ -1299,3 +1299,15 @@ func TestUpdateTaskProcessWhenWatchDmChannel(t *testing.T) { err = removeAllSession() assert.Nil(t, err) } + +func TestShowPartitions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + rootCoord := newRootCoordMock() + rootCoord.createCollection(defaultCollectionID) + rootCoord.createPartition(defaultCollectionID, defaultPartitionID) + + partitionIDs, err := showPartitions(ctx, defaultCollectionID, rootCoord) + assert.Nil(t, err) + assert.Equal(t, 2, len(partitionIDs)) + cancel() +}