Make the `UnsubDmChannel` rpc unsuccessful when the releaseCollectionTask fails (#21469)

Signed-off-by: SimFG <bang.fu@zilliz.com>
pull/21486/head
SimFG 2023-01-02 16:39:32 +08:00 committed by GitHub
parent 772e5a4a3b
commit 289df1cc3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 60 additions and 12 deletions

View File

@ -433,19 +433,19 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
}
log.Info("unsubDmChannel(ReleaseCollection) enqueue done", zap.Int64("collectionID", req.GetCollectionID()))
func() {
err = dct.WaitToFinish()
if err != nil {
log.Warn("failed to do subscribe channel task successfully", zap.Error(err))
return
}
log.Info("unsubDmChannel(ReleaseCollection) WaitToFinish done", zap.Int64("collectionID", req.GetCollectionID()))
}()
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
err = dct.WaitToFinish()
if err != nil {
log.Warn("failed to do subscribe channel task successfully", zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}, nil
}
return status, nil
log.Info("unsubDmChannel(ReleaseCollection) WaitToFinish done", zap.Int64("collectionID", req.GetCollectionID()))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
// LoadSegments load historical data into query node, historical data can be vector data or index

View File

@ -212,6 +212,54 @@ func TestImpl_UnsubDmChannel(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
t.Run("normal run", func(t *testing.T) {
schema := genTestCollectionSchema()
req := &queryPb.WatchDmChannelsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchDmChannels,
MsgID: rand.Int63(),
TargetID: node.session.ServerID,
},
NodeID: 0,
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
Schema: schema,
Infos: []*datapb.VchannelInfo{
{
CollectionID: 1000,
ChannelName: "1000-dmc0",
},
},
}
status, err := node.WatchDmChannels(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
{
req := &queryPb.UnsubDmChannelRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_UnsubDmChannel,
MsgID: rand.Int63(),
TargetID: node.session.ServerID,
},
NodeID: 0,
CollectionID: defaultCollectionID,
ChannelName: "1000-dmc0",
}
originMetaReplica := node.metaReplica
node.metaReplica = newMockReplicaInterface()
status, err := node.UnsubDmChannel(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
node.metaReplica = originMetaReplica
status, err = node.UnsubDmChannel(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
}
})
t.Run("target not match", func(t *testing.T) {
req := &queryPb.UnsubDmChannelRequest{
Base: &commonpb.MsgBase{