mirror of https://github.com/milvus-io/milvus.git
Remove redundant check and add addQueryChannel unittest for query node (#7944)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/8029/head
parent
8e2eb3b29b
commit
b465e8f9a4
|
@ -124,11 +124,6 @@ func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQuery
|
|||
log.Debug("querynode AsProducer: " + strings.Join(producerChannels, ", "))
|
||||
|
||||
// message stream need to asConsumer before start
|
||||
// add search collection
|
||||
if !node.queryService.hasQueryCollection(collectionID) {
|
||||
node.queryService.addQueryCollection(collectionID)
|
||||
log.Debug("add query collection", zap.Any("collectionID", collectionID))
|
||||
}
|
||||
sc.start()
|
||||
log.Debug("start query collection", zap.Any("collectionID", collectionID))
|
||||
|
||||
|
|
|
@ -67,28 +67,47 @@ func TestImpl_GetStatisticsChannel(t *testing.T) {
|
|||
func TestImpl_AddQueryChannel(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
node, err := genSimpleQueryNode(ctx)
|
||||
assert.NoError(t, err)
|
||||
t.Run("test addQueryChannel", func(t *testing.T) {
|
||||
node, err := genSimpleQueryNode(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
req := &queryPb.AddQueryChannelRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_WatchQueryChannels,
|
||||
MsgID: rand.Int63(),
|
||||
},
|
||||
NodeID: 0,
|
||||
CollectionID: defaultCollectionID,
|
||||
RequestChannelID: genQueryChannel(),
|
||||
ResultChannelID: genQueryResultChannel(),
|
||||
}
|
||||
req := &queryPb.AddQueryChannelRequest{
|
||||
Base: genCommonMsgBase(commonpb.MsgType_WatchQueryChannels),
|
||||
NodeID: 0,
|
||||
CollectionID: defaultCollectionID,
|
||||
RequestChannelID: genQueryChannel(),
|
||||
ResultChannelID: genQueryResultChannel(),
|
||||
}
|
||||
|
||||
status, err := node.AddQueryChannel(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
|
||||
status, err := node.AddQueryChannel(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
|
||||
})
|
||||
|
||||
node.UpdateStateCode(internalpb.StateCode_Abnormal)
|
||||
status, err = node.AddQueryChannel(ctx, req)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
|
||||
t.Run("test node is abnormal", func(t *testing.T) {
|
||||
node, err := genSimpleQueryNode(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
node.UpdateStateCode(internalpb.StateCode_Abnormal)
|
||||
status, err := node.AddQueryChannel(ctx, nil)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("test nil query service", func(t *testing.T) {
|
||||
node, err := genSimpleQueryNode(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
req := &queryPb.AddQueryChannelRequest{
|
||||
Base: genCommonMsgBase(commonpb.MsgType_WatchQueryChannels),
|
||||
CollectionID: defaultCollectionID,
|
||||
}
|
||||
|
||||
node.queryService = nil
|
||||
status, err := node.AddQueryChannel(ctx, req)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
|
||||
})
|
||||
}
|
||||
|
||||
func TestImpl_RemoveQueryChannel(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue