From 3ff0112e494ed8d213b67b1343c3ec0f7b7799f0 Mon Sep 17 00:00:00 2001 From: Jiquan Long Date: Thu, 27 Oct 2022 10:05:31 +0800 Subject: [PATCH] Fix querynode panic occasionally (#20106) Signed-off-by: longjiquan Signed-off-by: longjiquan --- .../querycoordv2/meta/coordinator_broker.go | 12 ++- .../meta/coordinator_broker_test.go | 78 +++++++++++++++++++ 2 files changed, 88 insertions(+), 2 deletions(-) create mode 100644 internal/querycoordv2/meta/coordinator_broker_test.go diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index 12a0342615..808161a580 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -72,12 +72,20 @@ func (broker *CoordinatorBroker) GetCollectionSchema(ctx context.Context, collec req := &milvuspb.DescribeCollectionRequest{ Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_GetDistribution), + commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection), ), CollectionID: collectionID, } resp, err := broker.rootCoord.DescribeCollection(ctx, req) - return resp.GetSchema(), err + if err != nil { + return nil, err + } + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { + err = errors.New(resp.GetStatus().GetReason()) + log.Error("failed to get collection schema", zap.Int64("collectionID", collectionID), zap.Error(err)) + return nil, err + } + return resp.GetSchema(), nil } func (broker *CoordinatorBroker) GetPartitions(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) { diff --git a/internal/querycoordv2/meta/coordinator_broker_test.go b/internal/querycoordv2/meta/coordinator_broker_test.go new file mode 100644 index 0000000000..7352f6bbea --- /dev/null +++ b/internal/querycoordv2/meta/coordinator_broker_test.go @@ -0,0 +1,78 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package meta + +import ( + "context" + "errors" + "testing" + + "github.com/milvus-io/milvus-proto/go-api/schemapb" + + "github.com/milvus-io/milvus-proto/go-api/commonpb" + "github.com/milvus-io/milvus-proto/go-api/milvuspb" + + "github.com/stretchr/testify/assert" + + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/internal/mocks" +) + +func TestCoordinatorBroker_GetCollectionSchema(t *testing.T) { + t.Run("got error on DescribeCollection", func(t *testing.T) { + rootCoord := mocks.NewRootCoord(t) + rootCoord.On("DescribeCollection", + mock.Anything, + mock.Anything, + ).Return(nil, errors.New("error mock DescribeCollection")) + ctx := context.Background() + broker := &CoordinatorBroker{rootCoord: rootCoord} + _, err := broker.GetCollectionSchema(ctx, 100) + assert.Error(t, err) + }) + + t.Run("non-success code", func(t *testing.T) { + rootCoord := mocks.NewRootCoord(t) + rootCoord.On("DescribeCollection", + mock.Anything, + mock.Anything, + ).Return(&milvuspb.DescribeCollectionResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CollectionNotExists}, + }, nil) + ctx := context.Background() + broker := &CoordinatorBroker{rootCoord: rootCoord} + _, err := broker.GetCollectionSchema(ctx, 100) + assert.Error(t, err) + }) + + t.Run("normal case", func(t *testing.T) { + rootCoord := mocks.NewRootCoord(t) + rootCoord.On("DescribeCollection", + mock.Anything, + mock.Anything, + ).Return(&milvuspb.DescribeCollectionResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Schema: &schemapb.CollectionSchema{Name: "test_schema"}, + }, nil) + ctx := context.Background() + broker := &CoordinatorBroker{rootCoord: rootCoord} + schema, err := broker.GetCollectionSchema(ctx, 100) + assert.NoError(t, err) + assert.Equal(t, "test_schema", schema.GetName()) + }) +}