mirror of https://github.com/milvus-io/milvus.git
enhance: Add management api to check querycoord balance status (#37784)
issue: #37783 Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/39660/head
parent
b9e3ec7175
commit
bfc802297e
|
@ -472,6 +472,17 @@ func (c *Client) ResumeBalance(ctx context.Context, req *querypb.ResumeBalanceRe
|
|||
})
|
||||
}
|
||||
|
||||
func (c *Client) CheckBalanceStatus(ctx context.Context, req *querypb.CheckBalanceStatusRequest, opts ...grpc.CallOption) (*querypb.CheckBalanceStatusResponse, error) {
|
||||
req = typeutil.Clone(req)
|
||||
commonpbutil.UpdateMsgBase(
|
||||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*querypb.CheckBalanceStatusResponse, error) {
|
||||
return client.CheckBalanceStatus(ctx, req)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Client) SuspendNode(ctx context.Context, req *querypb.SuspendNodeRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
req = typeutil.Clone(req)
|
||||
commonpbutil.UpdateMsgBase(
|
||||
|
|
|
@ -185,6 +185,9 @@ func Test_NewClient(t *testing.T) {
|
|||
|
||||
r39, err := client.CheckQueryNodeDistribution(ctx, nil)
|
||||
retCheck(retNotNil, r39, err)
|
||||
|
||||
r40, err := client.CheckBalanceStatus(ctx, nil)
|
||||
retCheck(retNotNil, r40, err)
|
||||
}
|
||||
|
||||
client.(*Client).grpcClient = &mock.GRPCClientBase[querypb.QueryCoordClient]{
|
||||
|
|
|
@ -457,6 +457,10 @@ func (s *Server) ResumeBalance(ctx context.Context, req *querypb.ResumeBalanceRe
|
|||
return s.queryCoord.ResumeBalance(ctx, req)
|
||||
}
|
||||
|
||||
func (s *Server) CheckBalanceStatus(ctx context.Context, req *querypb.CheckBalanceStatusRequest) (*querypb.CheckBalanceStatusResponse, error) {
|
||||
return s.queryCoord.CheckBalanceStatus(ctx, req)
|
||||
}
|
||||
|
||||
func (s *Server) SuspendNode(ctx context.Context, req *querypb.SuspendNodeRequest) (*commonpb.Status, error) {
|
||||
return s.queryCoord.SuspendNode(ctx, req)
|
||||
}
|
||||
|
|
|
@ -311,6 +311,14 @@ func Test_NewServer(t *testing.T) {
|
|||
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
||||
})
|
||||
|
||||
t.Run("CheckBalanceStatus", func(t *testing.T) {
|
||||
req := &querypb.CheckBalanceStatusRequest{}
|
||||
mqc.EXPECT().CheckBalanceStatus(mock.Anything, req).Return(&querypb.CheckBalanceStatusResponse{Status: merr.Success()}, nil)
|
||||
resp, err := server.CheckBalanceStatus(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
||||
})
|
||||
|
||||
t.Run("SuspendNode", func(t *testing.T) {
|
||||
req := &querypb.SuspendNodeRequest{}
|
||||
mqc.EXPECT().SuspendNode(mock.Anything, req).Return(merr.Success(), nil)
|
||||
|
|
|
@ -53,6 +53,7 @@ const (
|
|||
|
||||
RouteSuspendQueryCoordBalance = "/management/querycoord/balance/suspend"
|
||||
RouteResumeQueryCoordBalance = "/management/querycoord/balance/resume"
|
||||
RouteQueryCoordBalanceStatus = "/management/querycoord/balance/status"
|
||||
RouteTransferSegment = "/management/querycoord/transfer/segment"
|
||||
RouteTransferChannel = "/management/querycoord/transfer/channel"
|
||||
|
||||
|
|
|
@ -93,6 +93,65 @@ func (_c *MockQueryCoord_ActivateChecker_Call) RunAndReturn(run func(context.Con
|
|||
return _c
|
||||
}
|
||||
|
||||
// CheckBalanceStatus provides a mock function with given fields: _a0, _a1
|
||||
func (_m *MockQueryCoord) CheckBalanceStatus(_a0 context.Context, _a1 *querypb.CheckBalanceStatusRequest) (*querypb.CheckBalanceStatusResponse, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for CheckBalanceStatus")
|
||||
}
|
||||
|
||||
var r0 *querypb.CheckBalanceStatusResponse
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *querypb.CheckBalanceStatusRequest) (*querypb.CheckBalanceStatusResponse, error)); ok {
|
||||
return rf(_a0, _a1)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *querypb.CheckBalanceStatusRequest) *querypb.CheckBalanceStatusResponse); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*querypb.CheckBalanceStatusResponse)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, *querypb.CheckBalanceStatusRequest) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockQueryCoord_CheckBalanceStatus_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckBalanceStatus'
|
||||
type MockQueryCoord_CheckBalanceStatus_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// CheckBalanceStatus is a helper method to define mock.On call
|
||||
// - _a0 context.Context
|
||||
// - _a1 *querypb.CheckBalanceStatusRequest
|
||||
func (_e *MockQueryCoord_Expecter) CheckBalanceStatus(_a0 interface{}, _a1 interface{}) *MockQueryCoord_CheckBalanceStatus_Call {
|
||||
return &MockQueryCoord_CheckBalanceStatus_Call{Call: _e.mock.On("CheckBalanceStatus", _a0, _a1)}
|
||||
}
|
||||
|
||||
func (_c *MockQueryCoord_CheckBalanceStatus_Call) Run(run func(_a0 context.Context, _a1 *querypb.CheckBalanceStatusRequest)) *MockQueryCoord_CheckBalanceStatus_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(*querypb.CheckBalanceStatusRequest))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockQueryCoord_CheckBalanceStatus_Call) Return(_a0 *querypb.CheckBalanceStatusResponse, _a1 error) *MockQueryCoord_CheckBalanceStatus_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockQueryCoord_CheckBalanceStatus_Call) RunAndReturn(run func(context.Context, *querypb.CheckBalanceStatusRequest) (*querypb.CheckBalanceStatusResponse, error)) *MockQueryCoord_CheckBalanceStatus_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// CheckHealth provides a mock function with given fields: _a0, _a1
|
||||
func (_m *MockQueryCoord) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
|
|
@ -105,6 +105,80 @@ func (_c *MockQueryCoordClient_ActivateChecker_Call) RunAndReturn(run func(conte
|
|||
return _c
|
||||
}
|
||||
|
||||
// CheckBalanceStatus provides a mock function with given fields: ctx, in, opts
|
||||
func (_m *MockQueryCoordClient) CheckBalanceStatus(ctx context.Context, in *querypb.CheckBalanceStatusRequest, opts ...grpc.CallOption) (*querypb.CheckBalanceStatusResponse, error) {
|
||||
_va := make([]interface{}, len(opts))
|
||||
for _i := range opts {
|
||||
_va[_i] = opts[_i]
|
||||
}
|
||||
var _ca []interface{}
|
||||
_ca = append(_ca, ctx, in)
|
||||
_ca = append(_ca, _va...)
|
||||
ret := _m.Called(_ca...)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for CheckBalanceStatus")
|
||||
}
|
||||
|
||||
var r0 *querypb.CheckBalanceStatusResponse
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *querypb.CheckBalanceStatusRequest, ...grpc.CallOption) (*querypb.CheckBalanceStatusResponse, error)); ok {
|
||||
return rf(ctx, in, opts...)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *querypb.CheckBalanceStatusRequest, ...grpc.CallOption) *querypb.CheckBalanceStatusResponse); ok {
|
||||
r0 = rf(ctx, in, opts...)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*querypb.CheckBalanceStatusResponse)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, *querypb.CheckBalanceStatusRequest, ...grpc.CallOption) error); ok {
|
||||
r1 = rf(ctx, in, opts...)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockQueryCoordClient_CheckBalanceStatus_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckBalanceStatus'
|
||||
type MockQueryCoordClient_CheckBalanceStatus_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// CheckBalanceStatus is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - in *querypb.CheckBalanceStatusRequest
|
||||
// - opts ...grpc.CallOption
|
||||
func (_e *MockQueryCoordClient_Expecter) CheckBalanceStatus(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryCoordClient_CheckBalanceStatus_Call {
|
||||
return &MockQueryCoordClient_CheckBalanceStatus_Call{Call: _e.mock.On("CheckBalanceStatus",
|
||||
append([]interface{}{ctx, in}, opts...)...)}
|
||||
}
|
||||
|
||||
func (_c *MockQueryCoordClient_CheckBalanceStatus_Call) Run(run func(ctx context.Context, in *querypb.CheckBalanceStatusRequest, opts ...grpc.CallOption)) *MockQueryCoordClient_CheckBalanceStatus_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
variadicArgs := make([]grpc.CallOption, len(args)-2)
|
||||
for i, a := range args[2:] {
|
||||
if a != nil {
|
||||
variadicArgs[i] = a.(grpc.CallOption)
|
||||
}
|
||||
}
|
||||
run(args[0].(context.Context), args[1].(*querypb.CheckBalanceStatusRequest), variadicArgs...)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockQueryCoordClient_CheckBalanceStatus_Call) Return(_a0 *querypb.CheckBalanceStatusResponse, _a1 error) *MockQueryCoordClient_CheckBalanceStatus_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockQueryCoordClient_CheckBalanceStatus_Call) RunAndReturn(run func(context.Context, *querypb.CheckBalanceStatusRequest, ...grpc.CallOption) (*querypb.CheckBalanceStatusResponse, error)) *MockQueryCoordClient_CheckBalanceStatus_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// CheckHealth provides a mock function with given fields: ctx, in, opts
|
||||
func (_m *MockQueryCoordClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
|
||||
_va := make([]interface{}, len(opts))
|
||||
|
|
|
@ -82,6 +82,10 @@ func RegisterMgrRoute(proxy *Proxy) {
|
|||
Path: management.RouteCheckQueryNodeDistribution,
|
||||
HandlerFunc: proxy.CheckQueryNodeDistribution,
|
||||
})
|
||||
management.Register(&management.Handler{
|
||||
Path: management.RouteQueryCoordBalanceStatus,
|
||||
HandlerFunc: proxy.CheckQueryCoordBalanceStatus,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -249,6 +253,29 @@ func (node *Proxy) ResumeQueryCoordBalance(w http.ResponseWriter, req *http.Requ
|
|||
w.Write([]byte(`{"msg": "OK"}`))
|
||||
}
|
||||
|
||||
func (node *Proxy) CheckQueryCoordBalanceStatus(w http.ResponseWriter, req *http.Request) {
|
||||
resp, err := node.queryCoord.CheckBalanceStatus(req.Context(), &querypb.CheckBalanceStatusRequest{
|
||||
Base: commonpbutil.NewMsgBase(),
|
||||
})
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to check balance status, %s"}`, err.Error())))
|
||||
return
|
||||
}
|
||||
|
||||
if !merr.Ok(resp.GetStatus()) {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to check balance status, %s"}`, resp.GetStatus().GetReason())))
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
balanceStatus := "suspended"
|
||||
if resp.IsActive {
|
||||
balanceStatus = "active"
|
||||
}
|
||||
w.Write([]byte(fmt.Sprintf(`{"msg": "OK", "status": "%v"}`, balanceStatus)))
|
||||
}
|
||||
|
||||
func (node *Proxy) SuspendQueryNode(w http.ResponseWriter, req *http.Request) {
|
||||
err := req.ParseForm()
|
||||
if err != nil {
|
||||
|
|
|
@ -371,6 +371,68 @@ func (s *ProxyManagementSuite) TestResumeQueryCoordBalance() {
|
|||
})
|
||||
}
|
||||
|
||||
func (s *ProxyManagementSuite) TestCheckBalanceStatus() {
|
||||
s.Run("normal", func() {
|
||||
s.SetupTest()
|
||||
defer s.TearDownTest()
|
||||
|
||||
s.querycoord.EXPECT().CheckBalanceStatus(mock.Anything, mock.Anything).Return(&querypb.CheckBalanceStatusResponse{
|
||||
Status: merr.Success(),
|
||||
IsActive: true,
|
||||
}, nil).Times(1)
|
||||
|
||||
req, err := http.NewRequest(http.MethodPost, management.RouteQueryCoordBalanceStatus, nil)
|
||||
s.Require().NoError(err)
|
||||
|
||||
recorder := httptest.NewRecorder()
|
||||
s.proxy.CheckQueryCoordBalanceStatus(recorder, req)
|
||||
s.Equal(http.StatusOK, recorder.Code)
|
||||
s.Equal(`{"msg": "OK", "status": "active"}`, recorder.Body.String())
|
||||
|
||||
s.querycoord.EXPECT().CheckBalanceStatus(mock.Anything, mock.Anything).Return(&querypb.CheckBalanceStatusResponse{
|
||||
Status: merr.Success(),
|
||||
IsActive: false,
|
||||
}, nil).Times(1)
|
||||
|
||||
req, err = http.NewRequest(http.MethodPost, management.RouteQueryCoordBalanceStatus, nil)
|
||||
s.Require().NoError(err)
|
||||
recorder = httptest.NewRecorder()
|
||||
s.proxy.CheckQueryCoordBalanceStatus(recorder, req)
|
||||
s.Equal(http.StatusOK, recorder.Code)
|
||||
s.Equal(`{"msg": "OK", "status": "suspended"}`, recorder.Body.String())
|
||||
})
|
||||
|
||||
s.Run("return_error", func() {
|
||||
s.SetupTest()
|
||||
defer s.TearDownTest()
|
||||
|
||||
s.querycoord.EXPECT().CheckBalanceStatus(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error"))
|
||||
|
||||
req, err := http.NewRequest(http.MethodPost, management.RouteQueryCoordBalanceStatus, nil)
|
||||
s.Require().NoError(err)
|
||||
|
||||
recorder := httptest.NewRecorder()
|
||||
s.proxy.CheckQueryCoordBalanceStatus(recorder, req)
|
||||
s.Equal(http.StatusInternalServerError, recorder.Code)
|
||||
})
|
||||
|
||||
s.Run("return_failure", func() {
|
||||
s.SetupTest()
|
||||
defer s.TearDownTest()
|
||||
|
||||
req, err := http.NewRequest(http.MethodPost, management.RouteQueryCoordBalanceStatus, nil)
|
||||
s.Require().NoError(err)
|
||||
|
||||
s.querycoord.EXPECT().CheckBalanceStatus(mock.Anything, mock.Anything).Return(&querypb.CheckBalanceStatusResponse{
|
||||
Status: merr.Status(merr.ErrServiceNotReady),
|
||||
}, nil)
|
||||
|
||||
recorder := httptest.NewRecorder()
|
||||
s.proxy.CheckQueryCoordBalanceStatus(recorder, req)
|
||||
s.Equal(http.StatusInternalServerError, recorder.Code)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *ProxyManagementSuite) TestSuspendQueryNode() {
|
||||
s.Run("normal", func() {
|
||||
s.SetupTest()
|
||||
|
|
|
@ -402,10 +402,18 @@ func (suite *OpsServiceSuite) TestSuspendAndResumeBalance() {
|
|||
suite.True(merr.Ok(resp))
|
||||
suite.False(suite.checkerController.IsActive(utils.BalanceChecker))
|
||||
|
||||
resp1, err := suite.server.CheckBalanceStatus(ctx, &querypb.CheckBalanceStatusRequest{})
|
||||
suite.NoError(err)
|
||||
suite.Equal(false, resp1.GetIsActive())
|
||||
|
||||
resp, err = suite.server.ResumeBalance(ctx, &querypb.ResumeBalanceRequest{})
|
||||
suite.NoError(err)
|
||||
suite.True(merr.Ok(resp))
|
||||
suite.True(suite.checkerController.IsActive(utils.BalanceChecker))
|
||||
|
||||
resp2, err := suite.server.CheckBalanceStatus(ctx, &querypb.CheckBalanceStatusRequest{})
|
||||
suite.NoError(err)
|
||||
suite.Equal(true, resp2.GetIsActive())
|
||||
}
|
||||
|
||||
func (suite *OpsServiceSuite) TestSuspendAndResumeNode() {
|
||||
|
|
|
@ -196,6 +196,33 @@ func (s *Server) ResumeBalance(ctx context.Context, req *querypb.ResumeBalanceRe
|
|||
return merr.Success(), nil
|
||||
}
|
||||
|
||||
// CheckBalanceStatus checks whether balance is active or suspended
|
||||
func (s *Server) CheckBalanceStatus(ctx context.Context, req *querypb.CheckBalanceStatusRequest) (*querypb.CheckBalanceStatusResponse, error) {
|
||||
log := log.Ctx(ctx)
|
||||
log.Info("CheckBalanceStatus request received")
|
||||
|
||||
errMsg := "failed to check balance status"
|
||||
if err := merr.CheckHealthy(s.State()); err != nil {
|
||||
log.Warn(errMsg, zap.Error(err))
|
||||
return &querypb.CheckBalanceStatusResponse{
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
|
||||
isActive, err := s.checkerController.IsActive(utils.BalanceChecker)
|
||||
if err != nil {
|
||||
log.Warn(errMsg, zap.Error(err))
|
||||
return &querypb.CheckBalanceStatusResponse{
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &querypb.CheckBalanceStatusResponse{
|
||||
Status: merr.Success(),
|
||||
IsActive: isActive,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// suspend node from resource operation, for given node, suspend load_segment/sub_channel operations
|
||||
func (s *Server) SuspendNode(ctx context.Context, req *querypb.SuspendNodeRequest) (*commonpb.Status, error) {
|
||||
log := log.Ctx(ctx)
|
||||
|
|
|
@ -162,6 +162,10 @@ func (m *GrpcQueryCoordClient) ResumeBalance(ctx context.Context, req *querypb.R
|
|||
return &commonpb.Status{}, m.Err
|
||||
}
|
||||
|
||||
func (m *GrpcQueryCoordClient) CheckBalanceStatus(ctx context.Context, req *querypb.CheckBalanceStatusRequest, opts ...grpc.CallOption) (*querypb.CheckBalanceStatusResponse, error) {
|
||||
return &querypb.CheckBalanceStatusResponse{}, m.Err
|
||||
}
|
||||
|
||||
func (m *GrpcQueryCoordClient) SuspendNode(ctx context.Context, req *querypb.SuspendNodeRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
return &commonpb.Status{}, m.Err
|
||||
}
|
||||
|
|
|
@ -100,6 +100,7 @@ service QueryCoord {
|
|||
rpc GetQueryNodeDistribution(GetQueryNodeDistributionRequest) returns (GetQueryNodeDistributionResponse) {}
|
||||
rpc SuspendBalance(SuspendBalanceRequest) returns (common.Status) {}
|
||||
rpc ResumeBalance(ResumeBalanceRequest) returns (common.Status) {}
|
||||
rpc CheckBalanceStatus(CheckBalanceStatusRequest) returns (CheckBalanceStatusResponse) {}
|
||||
rpc SuspendNode(SuspendNodeRequest) returns (common.Status) {}
|
||||
rpc ResumeNode(ResumeNodeRequest) returns (common.Status) {}
|
||||
rpc TransferSegment(TransferSegmentRequest) returns (common.Status) {}
|
||||
|
@ -895,6 +896,15 @@ message ResumeBalanceRequest {
|
|||
common.MsgBase base = 1;
|
||||
}
|
||||
|
||||
message CheckBalanceStatusRequest {
|
||||
common.MsgBase base = 1;
|
||||
}
|
||||
|
||||
message CheckBalanceStatusResponse {
|
||||
common.Status status = 1;
|
||||
bool is_active = 2;
|
||||
}
|
||||
|
||||
message SuspendNodeRequest {
|
||||
common.MsgBase base = 1;
|
||||
int64 nodeID = 2;
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -54,6 +54,7 @@ const (
|
|||
QueryCoord_GetQueryNodeDistribution_FullMethodName = "/milvus.proto.query.QueryCoord/GetQueryNodeDistribution"
|
||||
QueryCoord_SuspendBalance_FullMethodName = "/milvus.proto.query.QueryCoord/SuspendBalance"
|
||||
QueryCoord_ResumeBalance_FullMethodName = "/milvus.proto.query.QueryCoord/ResumeBalance"
|
||||
QueryCoord_CheckBalanceStatus_FullMethodName = "/milvus.proto.query.QueryCoord/CheckBalanceStatus"
|
||||
QueryCoord_SuspendNode_FullMethodName = "/milvus.proto.query.QueryCoord/SuspendNode"
|
||||
QueryCoord_ResumeNode_FullMethodName = "/milvus.proto.query.QueryCoord/ResumeNode"
|
||||
QueryCoord_TransferSegment_FullMethodName = "/milvus.proto.query.QueryCoord/TransferSegment"
|
||||
|
@ -101,6 +102,7 @@ type QueryCoordClient interface {
|
|||
GetQueryNodeDistribution(ctx context.Context, in *GetQueryNodeDistributionRequest, opts ...grpc.CallOption) (*GetQueryNodeDistributionResponse, error)
|
||||
SuspendBalance(ctx context.Context, in *SuspendBalanceRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
ResumeBalance(ctx context.Context, in *ResumeBalanceRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
CheckBalanceStatus(ctx context.Context, in *CheckBalanceStatusRequest, opts ...grpc.CallOption) (*CheckBalanceStatusResponse, error)
|
||||
SuspendNode(ctx context.Context, in *SuspendNodeRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
ResumeNode(ctx context.Context, in *ResumeNodeRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
TransferSegment(ctx context.Context, in *TransferSegmentRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
|
@ -405,6 +407,15 @@ func (c *queryCoordClient) ResumeBalance(ctx context.Context, in *ResumeBalanceR
|
|||
return out, nil
|
||||
}
|
||||
|
||||
func (c *queryCoordClient) CheckBalanceStatus(ctx context.Context, in *CheckBalanceStatusRequest, opts ...grpc.CallOption) (*CheckBalanceStatusResponse, error) {
|
||||
out := new(CheckBalanceStatusResponse)
|
||||
err := c.cc.Invoke(ctx, QueryCoord_CheckBalanceStatus_FullMethodName, in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *queryCoordClient) SuspendNode(ctx context.Context, in *SuspendNodeRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
out := new(commonpb.Status)
|
||||
err := c.cc.Invoke(ctx, QueryCoord_SuspendNode_FullMethodName, in, out, opts...)
|
||||
|
@ -498,6 +509,7 @@ type QueryCoordServer interface {
|
|||
GetQueryNodeDistribution(context.Context, *GetQueryNodeDistributionRequest) (*GetQueryNodeDistributionResponse, error)
|
||||
SuspendBalance(context.Context, *SuspendBalanceRequest) (*commonpb.Status, error)
|
||||
ResumeBalance(context.Context, *ResumeBalanceRequest) (*commonpb.Status, error)
|
||||
CheckBalanceStatus(context.Context, *CheckBalanceStatusRequest) (*CheckBalanceStatusResponse, error)
|
||||
SuspendNode(context.Context, *SuspendNodeRequest) (*commonpb.Status, error)
|
||||
ResumeNode(context.Context, *ResumeNodeRequest) (*commonpb.Status, error)
|
||||
TransferSegment(context.Context, *TransferSegmentRequest) (*commonpb.Status, error)
|
||||
|
@ -606,6 +618,9 @@ func (UnimplementedQueryCoordServer) SuspendBalance(context.Context, *SuspendBal
|
|||
func (UnimplementedQueryCoordServer) ResumeBalance(context.Context, *ResumeBalanceRequest) (*commonpb.Status, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method ResumeBalance not implemented")
|
||||
}
|
||||
func (UnimplementedQueryCoordServer) CheckBalanceStatus(context.Context, *CheckBalanceStatusRequest) (*CheckBalanceStatusResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method CheckBalanceStatus not implemented")
|
||||
}
|
||||
func (UnimplementedQueryCoordServer) SuspendNode(context.Context, *SuspendNodeRequest) (*commonpb.Status, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method SuspendNode not implemented")
|
||||
}
|
||||
|
@ -1212,6 +1227,24 @@ func _QueryCoord_ResumeBalance_Handler(srv interface{}, ctx context.Context, dec
|
|||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _QueryCoord_CheckBalanceStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(CheckBalanceStatusRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(QueryCoordServer).CheckBalanceStatus(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: QueryCoord_CheckBalanceStatus_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(QueryCoordServer).CheckBalanceStatus(ctx, req.(*CheckBalanceStatusRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _QueryCoord_SuspendNode_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(SuspendNodeRequest)
|
||||
if err := dec(in); err != nil {
|
||||
|
@ -1455,6 +1488,10 @@ var QueryCoord_ServiceDesc = grpc.ServiceDesc{
|
|||
MethodName: "ResumeBalance",
|
||||
Handler: _QueryCoord_ResumeBalance_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "CheckBalanceStatus",
|
||||
Handler: _QueryCoord_CheckBalanceStatus_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "SuspendNode",
|
||||
Handler: _QueryCoord_SuspendNode_Handler,
|
||||
|
|
Loading…
Reference in New Issue