mirror of https://github.com/milvus-io/milvus.git
Check if database exists when Connect was called (#26115)
Signed-off-by: jaime <yun.zhang@zilliz.com>pull/26098/head
parent
517fb95207
commit
6663e753e6
|
@ -48,6 +48,7 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/crypto"
|
||||
"github.com/milvus-io/milvus/pkg/util/errorutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/logutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
|
@ -5161,8 +5162,50 @@ func (node *Proxy) Connect(ctx context.Context, request *milvuspb.ConnectRequest
|
|||
return &milvuspb.ConnectResponse{Status: unhealthyStatus()}, nil
|
||||
}
|
||||
|
||||
db := GetCurDBNameFromContextOrDefault(ctx)
|
||||
logsToBePrinted := append(getLoggerOfClientInfo(request.GetClientInfo()), zap.String("db", db))
|
||||
log := log.Ctx(ctx).With(logsToBePrinted...)
|
||||
|
||||
log.Info("connect received")
|
||||
|
||||
resp, err := node.rootCoord.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
commonpbutil.WithMsgType(commonpb.MsgType_ListDatabases),
|
||||
),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Info("connect failed, failed to list databases", zap.Error(err))
|
||||
return &milvuspb.ConnectResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
||||
log.Info("connect failed, failed to list databases",
|
||||
zap.String("code", resp.GetStatus().GetErrorCode().String()),
|
||||
zap.String("reason", resp.GetStatus().GetReason()))
|
||||
return &milvuspb.ConnectResponse{
|
||||
Status: proto.Clone(resp.GetStatus()).(*commonpb.Status),
|
||||
}, nil
|
||||
}
|
||||
|
||||
if !funcutil.SliceContain(resp.GetDbNames(), db) {
|
||||
log.Info("connect failed, target database not exist")
|
||||
return &milvuspb.ConnectResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError, // DatabaseNotExist?
|
||||
Reason: fmt.Sprintf("database not found: %s", db),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
ts, err := node.tsoAllocator.AllocOne(ctx)
|
||||
if err != nil {
|
||||
log.Info("connect failed, failed to allocate timestamp", zap.Error(err))
|
||||
return &milvuspb.ConnectResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/cockroachdb/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
|
@ -585,7 +586,80 @@ func TestProxy_Connect(t *testing.T) {
|
|||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
||||
})
|
||||
|
||||
t.Run("failed to list database", func(t *testing.T) {
|
||||
r := mocks.NewRootCoord(t)
|
||||
r.On("ListDatabases",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
).Return(nil, errors.New("error mock ListDatabases"))
|
||||
|
||||
node := &Proxy{rootCoord: r}
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
|
||||
resp, err := node.Connect(context.TODO(), nil)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
||||
})
|
||||
|
||||
t.Run("list database error", func(t *testing.T) {
|
||||
r := mocks.NewRootCoord(t)
|
||||
r.On("ListDatabases",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
).Return(&milvuspb.ListDatabasesResponse{
|
||||
Status: unhealthyStatus(),
|
||||
}, nil)
|
||||
|
||||
node := &Proxy{rootCoord: r}
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
|
||||
resp, err := node.Connect(context.TODO(), nil)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
||||
})
|
||||
|
||||
t.Run("database not found", func(t *testing.T) {
|
||||
md := metadata.New(map[string]string{
|
||||
"dbName": "20230525",
|
||||
})
|
||||
ctx := metadata.NewIncomingContext(context.TODO(), md)
|
||||
|
||||
r := mocks.NewRootCoord(t)
|
||||
r.On("ListDatabases",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
).Return(&milvuspb.ListDatabasesResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
},
|
||||
DbNames: []string{},
|
||||
}, nil)
|
||||
|
||||
node := &Proxy{rootCoord: r}
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
|
||||
resp, err := node.Connect(ctx, nil)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
||||
})
|
||||
|
||||
t.Run("failed to allocate ts", func(t *testing.T) {
|
||||
md := metadata.New(map[string]string{
|
||||
"dbName": "20230525",
|
||||
})
|
||||
ctx := metadata.NewIncomingContext(context.TODO(), md)
|
||||
|
||||
r := mocks.NewRootCoord(t)
|
||||
r.On("ListDatabases",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
).Return(&milvuspb.ListDatabasesResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
},
|
||||
DbNames: []string{"20230525"},
|
||||
}, nil)
|
||||
|
||||
m := newMockTimestampAllocator(t)
|
||||
m.On("AllocTimestamp",
|
||||
mock.Anything,
|
||||
|
@ -594,14 +668,31 @@ func TestProxy_Connect(t *testing.T) {
|
|||
alloc, _ := newTimestampAllocator(m, 199)
|
||||
node := Proxy{
|
||||
tsoAllocator: alloc,
|
||||
rootCoord: r,
|
||||
}
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
resp, err := node.Connect(context.TODO(), nil)
|
||||
resp, err := node.Connect(ctx, nil)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
||||
})
|
||||
|
||||
t.Run("normal case", func(t *testing.T) {
|
||||
md := metadata.New(map[string]string{
|
||||
"dbName": "20230525",
|
||||
})
|
||||
ctx := metadata.NewIncomingContext(context.TODO(), md)
|
||||
|
||||
r := mocks.NewRootCoord(t)
|
||||
r.On("ListDatabases",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
).Return(&milvuspb.ListDatabasesResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
},
|
||||
DbNames: []string{"20230525"},
|
||||
}, nil)
|
||||
|
||||
m := newMockTimestampAllocator(t)
|
||||
m.On("AllocTimestamp",
|
||||
mock.Anything,
|
||||
|
@ -616,9 +707,10 @@ func TestProxy_Connect(t *testing.T) {
|
|||
alloc, _ := newTimestampAllocator(m, 199)
|
||||
node := Proxy{
|
||||
tsoAllocator: alloc,
|
||||
rootCoord: r,
|
||||
}
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
resp, err := node.Connect(context.TODO(), &milvuspb.ConnectRequest{
|
||||
resp, err := node.Connect(ctx, &milvuspb.ConnectRequest{
|
||||
ClientInfo: &commonpb.ClientInfo{},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
|
Loading…
Reference in New Issue