Fix Has Collection Failed on datacoord (#23710)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/23617/head
Xiaofan 2023-04-26 18:50:34 -07:00 committed by GitHub
parent 353305c091
commit 016311ad48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 41 additions and 31 deletions

View File

@ -338,7 +338,7 @@ func (h *ServerHandler) HasCollection(ctx context.Context, collectionID UniqueID
}
hasCollection = has
return nil
}, retry.Attempts(100000)); err != nil {
}, retry.Attempts(500)); err != nil {
log.Error("datacoord ServerHandler HasCollection finally failed")
panic("datacoord ServerHandler HasCollection finally failed")
}

View File

@ -18,6 +18,7 @@ package datacoord
import (
"context"
"fmt"
"sync/atomic"
"time"
@ -35,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -448,6 +450,14 @@ func (m *mockRootCoordService) HasCollection(ctx context.Context, req *milvuspb.
}
func (m *mockRootCoordService) DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
// return not exist
if req.CollectionID == -1 {
err := common.NewCollectionNotExistError(fmt.Sprintf("can't find collection: %d", req.CollectionID))
return &milvuspb.DescribeCollectionResponse{
// TODO: use commonpb.ErrorCode_CollectionNotExists. SDK use commonpb.ErrorCode_UnexpectedError now.
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error()},
}, nil
}
return &milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,

View File

@ -44,6 +44,7 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
@ -1035,33 +1036,11 @@ func (s *Server) hasCollection(ctx context.Context, collectionID int64) (bool, e
if resp.Status.ErrorCode == commonpb.ErrorCode_Success {
return true, nil
}
if resp.Status.ErrorCode == commonpb.ErrorCode_CollectionNotExists {
statusErr := common.NewStatusError(resp.Status.ErrorCode, resp.Status.Reason)
if common.IsCollectionNotExistError(statusErr) {
return false, nil
}
return false, fmt.Errorf("code:%s, reason:%s", resp.Status.GetErrorCode().String(), resp.Status.GetReason())
}
// hasCollectionInternal communicates with RootCoord and check whether this collection's meta exist in rootcoord.
func (s *Server) hasCollectionInternal(ctx context.Context, collectionID int64) (bool, error) {
resp, err := s.rootCoordClient.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
DbName: "",
CollectionID: collectionID,
})
if err != nil {
return false, err
}
if resp == nil {
return false, errNilResponse
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
return false, nil
}
return true, nil
return false, statusErr
}
func (s *Server) reCollectSegmentStats(ctx context.Context) {

View File

@ -2079,6 +2079,24 @@ func TestGetChannelSeekPosition(t *testing.T) {
}
}
func TestDescribeCollection(t *testing.T) {
t.Run("TestNotExistCollections", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
has, err := svr.handler.(*ServerHandler).HasCollection(context.TODO(), -1)
assert.NoError(t, err)
assert.False(t, has)
})
t.Run("TestExistCollections", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
has, err := svr.handler.(*ServerHandler).HasCollection(context.TODO(), 1314)
assert.NoError(t, err)
assert.True(t, has)
})
}
func TestGetDataVChanPositions(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

View File

@ -18,6 +18,7 @@ package common
import (
"fmt"
"strings"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
@ -112,10 +113,12 @@ func IsCollectionNotExistError(e error) bool {
}
// cycle import: common -> funcutil -> types -> sessionutil -> common
// return funcutil.SliceContain(collectionNotExistCodes, statusError.GetErrorCode())
for _, code := range collectionNotExistCodes {
if code == statusError.GetErrorCode() {
return true
}
if statusError.Status.ErrorCode == commonpb.ErrorCode_CollectionNotExists {
return true
}
if (statusError.Status.ErrorCode == commonpb.ErrorCode_UnexpectedError) && strings.Contains(statusError.Status.Reason, "can't find collection") {
return true
}
return false
}

View File

@ -55,7 +55,7 @@ func Test_IsCollectionNotExistError(t *testing.T) {
assert.False(t, IsCollectionNotExistError(nil))
assert.False(t, IsCollectionNotExistError(errors.New("not status error")))
for _, code := range collectionNotExistCodes {
err := NewStatusError(code, "collection not exist")
err := NewStatusError(code, "can't find collection")
assert.True(t, IsCollectionNotExistError(err))
}
assert.True(t, IsCollectionNotExistError(NewCollectionNotExistError("collection not exist")))