Refine Proxy errors (#27499)

Signed-off-by: yah01 <yah2er0ne@outlook.com>
pull/27534/head
yah01 2023-10-09 10:09:33 +08:00 committed by GitHub
parent 56c94cdfa7
commit 3759857bc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 314 additions and 424 deletions

File diff suppressed because it is too large Load Diff

View File

@ -49,7 +49,7 @@ func TestProxy_InvalidateCollectionMetaCache_remove_stream(t *testing.T) {
chMgr.EXPECT().removeDMLStream(mock.Anything).Return()
node := &Proxy{chMgr: chMgr}
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
ctx := context.Background()
req := &proxypb.InvalidateCollMetaCacheRequest{
@ -65,7 +65,7 @@ func TestProxy_CheckHealth(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.multiRateLimiter = NewMultiRateLimiter()
node.stateCode.Store(commonpb.StateCode_Abnormal)
node.UpdateStateCode(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
@ -83,7 +83,7 @@ func TestProxy_CheckHealth(t *testing.T) {
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
}
node.multiRateLimiter = NewMultiRateLimiter()
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
ctx := context.Background()
resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
@ -116,7 +116,7 @@ func TestProxy_CheckHealth(t *testing.T) {
dataCoord: dataCoordMock,
}
node.multiRateLimiter = NewMultiRateLimiter()
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
ctx := context.Background()
resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
@ -133,7 +133,7 @@ func TestProxy_CheckHealth(t *testing.T) {
queryCoord: qc,
}
node.multiRateLimiter = NewMultiRateLimiter()
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := node.CheckHealth(context.Background(), &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, true, resp.IsHealthy)
@ -160,20 +160,20 @@ func TestProxy_CheckHealth(t *testing.T) {
func TestProxyRenameCollection(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.stateCode.Store(commonpb.StateCode_Abnormal)
node.UpdateStateCode(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := node.RenameCollection(ctx, &milvuspb.RenameCollectionRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode())
assert.ErrorIs(t, merr.Error(resp), merr.ErrServiceNotReady)
})
t.Run("rename with illegal new collection name", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
ctx := context.Background()
resp, err := node.RenameCollection(ctx, &milvuspb.RenameCollectionRequest{NewName: "$#^%#&#$*!)#@!"})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_IllegalCollectionName, resp.GetErrorCode())
assert.ErrorIs(t, merr.Error(resp), merr.ErrParameterInvalid)
})
t.Run("rename fail", func(t *testing.T) {
@ -184,7 +184,7 @@ func TestProxyRenameCollection(t *testing.T) {
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
rootCoord: rc,
}
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
ctx := context.Background()
resp, err := node.RenameCollection(ctx, &milvuspb.RenameCollectionRequest{NewName: "new"})
@ -200,7 +200,7 @@ func TestProxyRenameCollection(t *testing.T) {
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
rootCoord: rc,
}
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
ctx := context.Background()
resp, err := node.RenameCollection(ctx, &milvuspb.RenameCollectionRequest{NewName: "new"})
@ -216,7 +216,7 @@ func TestProxy_ResourceGroup(t *testing.T) {
node, err := NewProxy(ctx, factory)
assert.NoError(t, err)
node.multiRateLimiter = NewMultiRateLimiter()
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
qc := mocks.NewMockQueryCoordClient(t)
node.SetQueryCoordClient(qc)
@ -308,7 +308,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) {
node, err := NewProxy(ctx, factory)
assert.NoError(t, err)
node.multiRateLimiter = NewMultiRateLimiter()
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
qc := mocks.NewMockQueryCoordClient(t)
node.SetQueryCoordClient(qc)
@ -329,7 +329,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) {
ResourceGroup: "...",
})
assert.NoError(t, err)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_IllegalArgument)
assert.ErrorIs(t, merr.Error(resp), merr.ErrParameterInvalid)
})
t.Run("drop resource group", func(t *testing.T) {
@ -347,7 +347,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) {
NumNode: 1,
})
assert.NoError(t, err)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_IllegalArgument)
assert.ErrorIs(t, merr.Error(resp), merr.ErrParameterInvalid)
})
t.Run("transfer replica", func(t *testing.T) {
@ -358,7 +358,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) {
CollectionName: "collection1",
})
assert.NoError(t, err)
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_IllegalArgument)
assert.ErrorIs(t, merr.Error(resp), merr.ErrParameterInvalid)
})
}
@ -398,7 +398,7 @@ func TestProxy_FlushAll_DbCollection(t *testing.T) {
node, err := NewProxy(ctx, factory)
assert.NoError(t, err)
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
node.tsoAllocator = &timestampAllocator{
tso: newMockTimestampAllocatorInterface(),
}
@ -437,7 +437,7 @@ func TestProxy_FlushAll(t *testing.T) {
node, err := NewProxy(ctx, factory)
assert.NoError(t, err)
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
node.tsoAllocator = &timestampAllocator{
tso: newMockTimestampAllocatorInterface(),
}
@ -483,11 +483,11 @@ func TestProxy_FlushAll(t *testing.T) {
})
t.Run("FlushAll failed, server is abnormal", func(t *testing.T) {
node.stateCode.Store(commonpb.StateCode_Abnormal)
node.UpdateStateCode(commonpb.StateCode_Abnormal)
resp, err := node.FlushAll(ctx, &milvuspb.FlushAllRequest{})
assert.NoError(t, err)
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
})
t.Run("FlushAll failed, get id failed", func(t *testing.T) {
@ -557,7 +557,7 @@ func TestProxy_GetFlushAllState(t *testing.T) {
node, err := NewProxy(ctx, factory)
assert.NoError(t, err)
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
node.tsoAllocator = &timestampAllocator{
tso: newMockTimestampAllocatorInterface(),
}
@ -576,11 +576,11 @@ func TestProxy_GetFlushAllState(t *testing.T) {
})
t.Run("GetFlushAllState failed, server is abnormal", func(t *testing.T) {
node.stateCode.Store(commonpb.StateCode_Abnormal)
node.UpdateStateCode(commonpb.StateCode_Abnormal)
resp, err := node.GetFlushAllState(ctx, &milvuspb.GetFlushAllStateRequest{})
assert.NoError(t, err)
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
})
t.Run("DataCoord GetFlushAllState failed", func(t *testing.T) {
@ -604,7 +604,7 @@ func TestProxy_GetFlushState(t *testing.T) {
node, err := NewProxy(ctx, factory)
assert.NoError(t, err)
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
node.tsoAllocator = &timestampAllocator{
tso: newMockTimestampAllocatorInterface(),
}
@ -623,11 +623,11 @@ func TestProxy_GetFlushState(t *testing.T) {
})
t.Run("GetFlushState failed, server is abnormal", func(t *testing.T) {
node.stateCode.Store(commonpb.StateCode_Abnormal)
node.UpdateStateCode(commonpb.StateCode_Abnormal)
resp, err := node.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{})
assert.NoError(t, err)
assert.Equal(t, resp.GetStatus().GetErrorCode(), commonpb.ErrorCode_NotReadyServe)
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
})
t.Run("GetFlushState with collection name", func(t *testing.T) {
@ -635,7 +635,7 @@ func TestProxy_GetFlushState(t *testing.T) {
CollectionName: "*",
})
assert.NoError(t, err)
assert.Equal(t, resp.GetStatus().GetErrorCode(), commonpb.ErrorCode_UnexpectedError)
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrParameterInvalid)
cacheBak := globalMetaCache
defer func() { globalMetaCache = cacheBak }()
@ -684,7 +684,7 @@ func TestProxy_GetReplicas(t *testing.T) {
node, err := NewProxy(ctx, factory)
assert.NoError(t, err)
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
node.tsoAllocator = &timestampAllocator{
tso: newMockTimestampAllocatorInterface(),
}
@ -705,13 +705,13 @@ func TestProxy_GetReplicas(t *testing.T) {
})
t.Run("proxy_not_healthy", func(t *testing.T) {
node.stateCode.Store(commonpb.StateCode_Abnormal)
node.UpdateStateCode(commonpb.StateCode_Abnormal)
resp, err := node.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
CollectionID: 1000,
})
assert.NoError(t, err)
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
})
t.Run("QueryCoordClient_returnsError", func(t *testing.T) {
@ -757,7 +757,7 @@ func TestProxy_Connect(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(&milvuspb.ListDatabasesResponse{
Status: unhealthyStatus(),
Status: merr.Status(merr.WrapErrServiceNotReady("initialization")),
}, nil)
node := &Proxy{rootCoord: r}
@ -885,11 +885,11 @@ func TestProxyCreateDatabase(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.stateCode.Store(commonpb.StateCode_Abnormal)
node.UpdateStateCode(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := node.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode())
assert.ErrorIs(t, merr.Error(resp), merr.ErrServiceNotReady)
})
factory := dependency.NewDefaultFactory(true)
@ -901,7 +901,7 @@ func TestProxyCreateDatabase(t *testing.T) {
tso: newMockTimestampAllocatorInterface(),
}
node.multiRateLimiter = NewMultiRateLimiter()
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory)
node.sched.ddQueue.setMaxTaskNum(10)
assert.NoError(t, err)
@ -925,7 +925,7 @@ func TestProxyCreateDatabase(t *testing.T) {
rc.On("CreateDatabase", mock.Anything, mock.Anything).
Return(merr.Status(nil), nil)
node.rootCoord = rc
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
ctx := context.Background()
resp, err := node.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{DbName: "db"})
@ -939,11 +939,11 @@ func TestProxyDropDatabase(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.stateCode.Store(commonpb.StateCode_Abnormal)
node.UpdateStateCode(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := node.DropDatabase(ctx, &milvuspb.DropDatabaseRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode())
assert.ErrorIs(t, merr.Error(resp), merr.ErrServiceNotReady)
})
factory := dependency.NewDefaultFactory(true)
@ -955,7 +955,7 @@ func TestProxyDropDatabase(t *testing.T) {
tso: newMockTimestampAllocatorInterface(),
}
node.multiRateLimiter = NewMultiRateLimiter()
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory)
node.sched.ddQueue.setMaxTaskNum(10)
assert.NoError(t, err)
@ -979,7 +979,7 @@ func TestProxyDropDatabase(t *testing.T) {
rc.On("DropDatabase", mock.Anything, mock.Anything).
Return(merr.Status(nil), nil)
node.rootCoord = rc
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
ctx := context.Background()
resp, err := node.DropDatabase(ctx, &milvuspb.DropDatabaseRequest{DbName: "db"})
@ -993,7 +993,7 @@ func TestProxyListDatabase(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.stateCode.Store(commonpb.StateCode_Abnormal)
node.UpdateStateCode(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := node.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{})
assert.NoError(t, err)
@ -1009,7 +1009,7 @@ func TestProxyListDatabase(t *testing.T) {
tso: newMockTimestampAllocatorInterface(),
}
node.multiRateLimiter = NewMultiRateLimiter()
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory)
node.sched.ddQueue.setMaxTaskNum(10)
assert.NoError(t, err)
@ -1035,7 +1035,7 @@ func TestProxyListDatabase(t *testing.T) {
Status: merr.Status(nil),
}, nil)
node.rootCoord = rc
node.stateCode.Store(commonpb.StateCode_Healthy)
node.UpdateStateCode(commonpb.StateCode_Healthy)
ctx := context.Background()
resp, err := node.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{})

View File

@ -23,12 +23,12 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/cockroachdb/errors"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -76,7 +76,7 @@ type Proxy struct {
ip string
port int
stateCode atomic.Value
stateCode atomic.Int32
etcdCli *clientv3.Client
address string
@ -135,6 +135,15 @@ func NewProxy(ctx context.Context, factory dependency.Factory) (*Proxy, error) {
return node, nil
}
// UpdateStateCode updates the state code of Proxy.
func (node *Proxy) UpdateStateCode(code commonpb.StateCode) {
node.stateCode.Store(int32(code))
}
func (node *Proxy) GetStateCode() commonpb.StateCode {
return commonpb.StateCode(node.stateCode.Load())
}
// Register registers proxy at etcd
func (node *Proxy) Register() error {
node.session.Register()

View File

@ -56,7 +56,7 @@ func TestProxyRpcLimit(t *testing.T) {
defer testServer.grpcServer.Stop()
client, err := grpcproxyclient.NewClient(ctx, "localhost:"+p.Port.GetValue(), 1)
assert.NoError(t, err)
proxy.stateCode.Store(commonpb.StateCode_Healthy)
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
rates := make([]*internalpb.Rate, 0)

View File

@ -479,7 +479,7 @@ func TestProxy(t *testing.T) {
err = proxy.Start()
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_Healthy, proxy.stateCode.Load().(commonpb.StateCode))
assert.Equal(t, commonpb.StateCode_Healthy, proxy.GetStateCode())
// register proxy
err = proxy.Register()
@ -496,7 +496,7 @@ func TestProxy(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, states.GetStatus().GetErrorCode())
assert.Equal(t, paramtable.GetNodeID(), states.State.NodeID)
assert.Equal(t, typeutil.ProxyRole, states.State.Role)
assert.Equal(t, proxy.stateCode.Load().(commonpb.StateCode), states.State.StateCode)
assert.Equal(t, proxy.GetStateCode(), states.State.StateCode)
})
t.Run("get statistics channel", func(t *testing.T) {
@ -1608,7 +1608,7 @@ func TestProxy(t *testing.T) {
CollectionName: collectionName,
Files: []string{"f1.json"},
}
proxy.stateCode.Store(commonpb.StateCode_Healthy)
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := proxy.Import(context.TODO(), req)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.NoError(t, err)
@ -1623,7 +1623,7 @@ func TestProxy(t *testing.T) {
CollectionName: "bad_collection_name",
Files: []string{"f1.json"},
}
proxy.stateCode.Store(commonpb.StateCode_Healthy)
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := proxy.Import(context.TODO(), req)
assert.NoError(t, err)
assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
@ -1636,7 +1636,7 @@ func TestProxy(t *testing.T) {
CollectionName: "bad_collection_name",
Files: []string{"f1.json"},
}
proxy.stateCode.Store(commonpb.StateCode_Healthy)
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := proxy.Import(context.TODO(), req)
assert.NoError(t, err)
assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
@ -2295,7 +2295,7 @@ func TestProxy(t *testing.T) {
// proxy unhealthy
//
//notStateCode := "not state code"
//proxy.stateCode.Store(notStateCode)
//proxy.UpdateStateCode(notStateCode)
//
//t.Run("GetComponentStates fail", func(t *testing.T) {
// _, err := proxy.GetComponentStates(ctx)
@ -3942,14 +3942,16 @@ func testProxyRefreshPolicyInfoCache(ctx context.Context, t *testing.T, proxy *P
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
_, err = proxy.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{})
assert.Error(t, err)
resp, err = proxy.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{})
assert.NoError(t, err)
assert.Error(t, merr.Error(resp))
_, err = proxy.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{
resp, err = proxy.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{
OpType: 100,
OpKey: funcutil.EncodeUserRoleCache("foo", "public"),
})
assert.Error(t, err)
assert.NoError(t, err)
assert.Error(t, merr.Error(resp))
})
wg.Wait()
}
@ -3976,7 +3978,7 @@ func Test_GetCompactionState(t *testing.T) {
t.Run("get compaction state", func(t *testing.T) {
datacoord := &DataCoordMock{}
proxy := &Proxy{dataCoord: datacoord}
proxy.stateCode.Store(commonpb.StateCode_Healthy)
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := proxy.GetCompactionState(context.TODO(), nil)
assert.EqualValues(t, &milvuspb.GetCompactionStateResponse{}, resp)
assert.NoError(t, err)
@ -3985,7 +3987,7 @@ func Test_GetCompactionState(t *testing.T) {
t.Run("get compaction state with unhealthy proxy", func(t *testing.T) {
datacoord := &DataCoordMock{}
proxy := &Proxy{dataCoord: datacoord}
proxy.stateCode.Store(commonpb.StateCode_Abnormal)
proxy.UpdateStateCode(commonpb.StateCode_Abnormal)
resp, err := proxy.GetCompactionState(context.TODO(), nil)
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.NoError(t, err)
@ -3996,7 +3998,7 @@ func Test_ManualCompaction(t *testing.T) {
t.Run("test manual compaction", func(t *testing.T) {
datacoord := &DataCoordMock{}
proxy := &Proxy{dataCoord: datacoord}
proxy.stateCode.Store(commonpb.StateCode_Healthy)
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := proxy.ManualCompaction(context.TODO(), nil)
assert.EqualValues(t, &milvuspb.ManualCompactionResponse{}, resp)
assert.NoError(t, err)
@ -4004,7 +4006,7 @@ func Test_ManualCompaction(t *testing.T) {
t.Run("test manual compaction with unhealthy", func(t *testing.T) {
datacoord := &DataCoordMock{}
proxy := &Proxy{dataCoord: datacoord}
proxy.stateCode.Store(commonpb.StateCode_Abnormal)
proxy.UpdateStateCode(commonpb.StateCode_Abnormal)
resp, err := proxy.ManualCompaction(context.TODO(), nil)
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.NoError(t, err)
@ -4015,7 +4017,7 @@ func Test_GetCompactionStateWithPlans(t *testing.T) {
t.Run("test get compaction state with plans", func(t *testing.T) {
datacoord := &DataCoordMock{}
proxy := &Proxy{dataCoord: datacoord}
proxy.stateCode.Store(commonpb.StateCode_Healthy)
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := proxy.GetCompactionStateWithPlans(context.TODO(), nil)
assert.EqualValues(t, &milvuspb.GetCompactionPlansResponse{}, resp)
assert.NoError(t, err)
@ -4023,7 +4025,7 @@ func Test_GetCompactionStateWithPlans(t *testing.T) {
t.Run("test get compaction state with plans with unhealthy proxy", func(t *testing.T) {
datacoord := &DataCoordMock{}
proxy := &Proxy{dataCoord: datacoord}
proxy.stateCode.Store(commonpb.StateCode_Abnormal)
proxy.UpdateStateCode(commonpb.StateCode_Abnormal)
resp, err := proxy.GetCompactionStateWithPlans(context.TODO(), nil)
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.NoError(t, err)
@ -4046,7 +4048,7 @@ func Test_GetFlushState(t *testing.T) {
datacoord := &DataCoordMock{}
proxy := &Proxy{dataCoord: datacoord}
proxy.stateCode.Store(commonpb.StateCode_Healthy)
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := proxy.GetFlushState(context.TODO(), &milvuspb.GetFlushStateRequest{
CollectionName: "coll",
})
@ -4057,7 +4059,7 @@ func Test_GetFlushState(t *testing.T) {
t.Run("test get flush state with unhealthy proxy", func(t *testing.T) {
datacoord := &DataCoordMock{}
proxy := &Proxy{dataCoord: datacoord}
proxy.stateCode.Store(commonpb.StateCode_Abnormal)
proxy.UpdateStateCode(commonpb.StateCode_Abnormal)
resp, err := proxy.GetFlushState(context.TODO(), nil)
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.NoError(t, err)
@ -4066,7 +4068,7 @@ func Test_GetFlushState(t *testing.T) {
func TestProxy_GetComponentStates(t *testing.T) {
n := &Proxy{}
n.stateCode.Store(commonpb.StateCode_Healthy)
n.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := n.GetComponentStates(context.Background(), nil)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
@ -4078,14 +4080,6 @@ func TestProxy_GetComponentStates(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
}
func TestProxy_GetComponentStates_state_code(t *testing.T) {
p := &Proxy{}
p.stateCode.Store("not commonpb.StateCode")
states, err := p.GetComponentStates(context.Background(), nil)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, states.GetStatus().GetErrorCode())
}
func TestProxy_Import(t *testing.T) {
var wg sync.WaitGroup
@ -4182,7 +4176,7 @@ func TestProxy_GetImportState(t *testing.T) {
rootCoord.state.Store(commonpb.StateCode_Healthy)
t.Run("test get import state", func(t *testing.T) {
proxy := &Proxy{rootCoord: rootCoord}
proxy.stateCode.Store(commonpb.StateCode_Healthy)
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := proxy.GetImportState(context.TODO(), req)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
@ -4190,7 +4184,7 @@ func TestProxy_GetImportState(t *testing.T) {
})
t.Run("test get import state with unhealthy", func(t *testing.T) {
proxy := &Proxy{rootCoord: rootCoord}
proxy.stateCode.Store(commonpb.StateCode_Abnormal)
proxy.UpdateStateCode(commonpb.StateCode_Abnormal)
resp, err := proxy.GetImportState(context.TODO(), req)
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.NoError(t, err)
@ -4203,7 +4197,7 @@ func TestProxy_ListImportTasks(t *testing.T) {
rootCoord.state.Store(commonpb.StateCode_Healthy)
t.Run("test list import tasks", func(t *testing.T) {
proxy := &Proxy{rootCoord: rootCoord}
proxy.stateCode.Store(commonpb.StateCode_Healthy)
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := proxy.ListImportTasks(context.TODO(), req)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
@ -4211,7 +4205,7 @@ func TestProxy_ListImportTasks(t *testing.T) {
})
t.Run("test list import tasks with unhealthy", func(t *testing.T) {
proxy := &Proxy{rootCoord: rootCoord}
proxy.stateCode.Store(commonpb.StateCode_Abnormal)
proxy.UpdateStateCode(commonpb.StateCode_Abnormal)
resp, err := proxy.ListImportTasks(context.TODO(), req)
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.NoError(t, err)
@ -4248,7 +4242,7 @@ func TestProxy_GetLoadState(t *testing.T) {
InMemoryPercentages: []int64{},
}, nil)
proxy := &Proxy{queryCoord: qc}
proxy.stateCode.Store(commonpb.StateCode_Healthy)
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
stateResp, err := proxy.GetLoadState(context.Background(), &milvuspb.GetLoadStateRequest{CollectionName: "foo"})
assert.NoError(t, err)
assert.ErrorIs(t, merr.Error(stateResp.GetStatus()), merr.ErrServiceNotReady)
@ -4263,7 +4257,7 @@ func TestProxy_GetLoadState(t *testing.T) {
qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(nil, merr.WrapErrCollectionNotLoaded("foo"))
qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(nil, merr.WrapErrPartitionNotLoaded("p1"))
proxy := &Proxy{queryCoord: qc}
proxy.stateCode.Store(commonpb.StateCode_Healthy)
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
stateResp, err := proxy.GetLoadState(context.Background(), &milvuspb.GetLoadStateRequest{CollectionName: "foo"})
assert.NoError(t, err)
@ -4304,7 +4298,7 @@ func TestProxy_GetLoadState(t *testing.T) {
InMemoryPercentages: []int64{100},
}, nil)
proxy := &Proxy{queryCoord: qc}
proxy.stateCode.Store(commonpb.StateCode_Healthy)
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
stateResp, err := proxy.GetLoadState(context.Background(), &milvuspb.GetLoadStateRequest{CollectionName: "foo", Base: &commonpb.MsgBase{}})
assert.NoError(t, err)
@ -4313,7 +4307,7 @@ func TestProxy_GetLoadState(t *testing.T) {
stateResp, err = proxy.GetLoadState(context.Background(), &milvuspb.GetLoadStateRequest{CollectionName: ""})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, stateResp.GetStatus().GetErrorCode())
assert.ErrorIs(t, merr.Error(stateResp.GetStatus()), merr.ErrParameterInvalid)
progressResp, err := proxy.GetLoadingProgress(context.Background(), &milvuspb.GetLoadingProgressRequest{CollectionName: "foo"})
assert.NoError(t, err)
@ -4339,7 +4333,7 @@ func TestProxy_GetLoadState(t *testing.T) {
InMemoryPercentages: []int64{50},
}, nil)
proxy := &Proxy{queryCoord: qc}
proxy.stateCode.Store(commonpb.StateCode_Healthy)
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
stateResp, err := proxy.GetLoadState(context.Background(), &milvuspb.GetLoadStateRequest{CollectionName: "foo"})
assert.NoError(t, err)
@ -4373,7 +4367,7 @@ func TestProxy_GetLoadState(t *testing.T) {
Status: merr.Status(mockErr),
}, nil)
proxy := &Proxy{queryCoord: qc}
proxy.stateCode.Store(commonpb.StateCode_Healthy)
proxy.UpdateStateCode(commonpb.StateCode_Healthy)
stateResp, err := proxy.GetLoadState(context.Background(), &milvuspb.GetLoadStateRequest{CollectionName: "foo"})
assert.NoError(t, err)

View File

@ -63,7 +63,6 @@ func (cdt *createDatabaseTask) PreExecute(ctx context.Context) error {
func (cdt *createDatabaseTask) Execute(ctx context.Context) error {
var err error
cdt.result = &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}
cdt.result, err = cdt.rootCoord.CreateDatabase(ctx, cdt.CreateDatabaseRequest)
return err
}
@ -125,7 +124,6 @@ func (ddt *dropDatabaseTask) PreExecute(ctx context.Context) error {
func (ddt *dropDatabaseTask) Execute(ctx context.Context) error {
var err error
ddt.result = &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}
ddt.result, err = ddt.rootCoord.DropDatabase(ctx, ddt.DropDatabaseRequest)
if ddt.result != nil && ddt.result.ErrorCode == commonpb.ErrorCode_Success {
@ -191,11 +189,6 @@ func (ldt *listDatabaseTask) PreExecute(ctx context.Context) error {
func (ldt *listDatabaseTask) Execute(ctx context.Context) error {
var err error
ldt.result = &milvuspb.ListDatabasesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
ldt.result, err = ldt.rootCoord.ListDatabases(ctx, ldt.ListDatabasesRequest)
return err
}

View File

@ -639,10 +639,7 @@ func (t *searchTask) Requery() error {
func (t *searchTask) fillInEmptyResult(numQueries int64) {
t.result = &milvuspb.SearchResults{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "search result is empty",
},
Status: merr.Success("search result is empty"),
CollectionName: t.collectionName,
Results: &schemapb.SearchResultData{
NumQueries: numQueries,

View File

@ -127,24 +127,24 @@ func validateCollectionNameOrAlias(entity, entityType string) error {
entity = strings.TrimSpace(entity)
if entity == "" {
return fmt.Errorf("collection %s should not be empty", entityType)
return merr.WrapErrParameterInvalidMsg("collection %s should not be empty", entityType)
}
invalidMsg := fmt.Sprintf("Invalid collection %s: %s. ", entityType, entity)
if len(entity) > Params.ProxyCfg.MaxNameLength.GetAsInt() {
return fmt.Errorf("%s the length of a collection %s must be less than %s characters", invalidMsg, entityType,
return merr.WrapErrParameterInvalidMsg("%s the length of a collection %s must be less than %s characters", invalidMsg, entityType,
Params.ProxyCfg.MaxNameLength.GetValue())
}
firstChar := entity[0]
if firstChar != '_' && !isAlpha(firstChar) {
return fmt.Errorf("%s the first character of a collection %s must be an underscore or letter", invalidMsg, entityType)
return merr.WrapErrParameterInvalidMsg("%s the first character of a collection %s must be an underscore or letter", invalidMsg, entityType)
}
for i := 1; i < len(entity); i++ {
c := entity[i]
if c != '_' && !isAlpha(c) && !isNumber(c) {
return fmt.Errorf("%s collection %s can only contain numbers, letters and underscores", invalidMsg, entityType)
return merr.WrapErrParameterInvalidMsg("%s collection %s can only contain numbers, letters and underscores", invalidMsg, entityType)
}
}
return nil
@ -157,19 +157,19 @@ func ValidateResourceGroupName(entity string) error {
invalidMsg := fmt.Sprintf("Invalid resource group name %s.", entity)
if len(entity) > Params.ProxyCfg.MaxNameLength.GetAsInt() {
return fmt.Errorf("%s the length of a resource group name must be less than %s characters",
return merr.WrapErrParameterInvalidMsg("%s the length of a resource group name must be less than %s characters",
invalidMsg, Params.ProxyCfg.MaxNameLength.GetValue())
}
firstChar := entity[0]
if firstChar != '_' && !isAlpha(firstChar) {
return fmt.Errorf("%s the first character of a resource group name must be an underscore or letter", invalidMsg)
return merr.WrapErrParameterInvalidMsg("%s the first character of a resource group name must be an underscore or letter", invalidMsg)
}
for i := 1; i < len(entity); i++ {
c := entity[i]
if c != '_' && !isAlpha(c) && !isNumber(c) {
return fmt.Errorf("%s resource group name can only contain numbers, letters and underscores", invalidMsg)
return merr.WrapErrParameterInvalidMsg("%s resource group name can only contain numbers, letters and underscores", invalidMsg)
}
}
return nil
@ -722,27 +722,23 @@ func ValidateUsername(username string) error {
username = strings.TrimSpace(username)
if username == "" {
return errors.New("username should not be empty")
return merr.WrapErrParameterInvalidMsg("username must be not empty")
}
invalidMsg := "Invalid username: " + username + ". "
if len(username) > Params.ProxyCfg.MaxUsernameLength.GetAsInt() {
msg := invalidMsg + "The length of username must be less than " + Params.ProxyCfg.MaxUsernameLength.GetValue() + " characters."
return errors.New(msg)
return merr.WrapErrParameterInvalidMsg("invalid username %s with length %d, the length of username must be less than %d", username, len(username), Params.ProxyCfg.MaxUsernameLength.GetValue())
}
firstChar := username[0]
if !isAlpha(firstChar) {
msg := invalidMsg + "The first character of username must be a letter."
return errors.New(msg)
return merr.WrapErrParameterInvalidMsg("invalid user name %s, the first character must be a letter, but got %s", username, firstChar)
}
usernameSize := len(username)
for i := 1; i < usernameSize; i++ {
c := username[i]
if c != '_' && !isAlpha(c) && !isNumber(c) {
msg := invalidMsg + "Username should only contain numbers, letters, and underscores."
return errors.New(msg)
return merr.WrapErrParameterInvalidMsg("invalid user name %s, username must contain only numbers, letters and underscores, but got %s", username, c)
}
}
return nil
@ -750,9 +746,9 @@ func ValidateUsername(username string) error {
func ValidatePassword(password string) error {
if len(password) < Params.ProxyCfg.MinPasswordLength.GetAsInt() || len(password) > Params.ProxyCfg.MaxPasswordLength.GetAsInt() {
msg := "The length of password must be great than " + Params.ProxyCfg.MinPasswordLength.GetValue() +
" and less than " + Params.ProxyCfg.MaxPasswordLength.GetValue() + " characters."
return errors.New(msg)
return merr.WrapErrParameterInvalidRange(Params.ProxyCfg.MinPasswordLength.GetAsInt(),
Params.ProxyCfg.MaxPasswordLength.GetAsInt(),
len(password), "invalid password length")
}
return nil
}

View File

@ -105,6 +105,12 @@ var (
ErrMqTopicNotEmpty = newMilvusError("topic not empty", 1301, false)
ErrMqInternal = newMilvusError("message queue internal error", 1302, false)
// Privilege related
// this operation is denied because the user not authorized, user need to login in first
ErrPrivilegeNotAuthenticated = newMilvusError("not authenticated", 1400, false)
// this operation is denied because the user has no permission to do this, user need higher privilege
ErrPrivilegeNotPermitted = newMilvusError("privilege not permitted", 1401, false)
// field related
ErrFieldNotFound = newMilvusError("field not found", 1700, false)

View File

@ -18,7 +18,6 @@ package merr
import (
"context"
"fmt"
"strings"
"github.com/cockroachdb/errors"
@ -94,6 +93,13 @@ func CheckRPCCall(resp any, err error) error {
return nil
}
func Success(reason ...string) *commonpb.Status {
status := Status(nil)
// NOLINT
status.Reason = strings.Join(reason, " ")
return status
}
// Deprecated
func StatusWithErrorCode(err error, code commonpb.ErrorCode) *commonpb.Status {
if err == nil {
@ -198,7 +204,7 @@ func Error(status *commonpb.Status) error {
// use code first
code := status.GetCode()
if code == 0 {
return newMilvusError(fmt.Sprintf("legacy error code:%d, reason: %s", status.GetErrorCode(), status.GetReason()), errUnexpected.errCode, false)
return newMilvusError(status.GetReason(), errUnexpected.errCode, false)
}
return newMilvusError(status.GetReason(), code, code&retryableFlag != 0)
@ -643,6 +649,16 @@ func WrapErrMqInternal(err error, msg ...string) error {
return err
}
func WrapErrPrivilegeNotAuthenticated(fmt string, args ...any) error {
err := errors.Wrapf(ErrPrivilegeNotAuthenticated, fmt, args...)
return err
}
func WrapErrPrivilegeNotPermitted(fmt string, args ...any) error {
err := errors.Wrapf(ErrPrivilegeNotPermitted, fmt, args...)
return err
}
// Segcore related
func WrapErrSegcore(code int32, msg ...string) error {
err := errors.Wrapf(ErrSegcore, "internal code=%v", code)