enhance: add restful api to trigger component stop (#32076)

issue: #32698
This PR add two rest api for component stop and status check:
1. `/management/stop?role=querynode` can stop the specified component
2. `/management/check/ready?role=rootcoord` can check whether the target
component is serviceable

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/33707/head
wei liu 2024-06-07 10:35:54 +08:00 committed by GitHub
parent e3d50a192d
commit 0a3d456688
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 164 additions and 78 deletions

View File

@ -31,6 +31,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/cmd/components"
"github.com/milvus-io/milvus/internal/http"
"github.com/milvus-io/milvus/internal/http/healthz"
@ -254,11 +255,11 @@ func (mr *MilvusRoles) setupLogger() {
func setupPrometheusHTTPServer(r *internalmetrics.MilvusRegistry) {
log.Info("setupPrometheusHTTPServer")
http.Register(&http.Handler{
Path: "/metrics",
Path: http.MetricsPath,
Handler: promhttp.HandlerFor(r, promhttp.HandlerOpts{}),
})
http.Register(&http.Handler{
Path: "/metrics_default",
Path: http.MetricsDefaultPath,
Handler: promhttp.Handler(),
})
}
@ -357,41 +358,71 @@ func (mr *MilvusRoles) Run() {
var wg sync.WaitGroup
local := mr.Local
componentMap := make(map[string]component)
var rootCoord, queryCoord, indexCoord, dataCoord component
var proxy, dataNode, indexNode, queryNode component
if mr.EnableRootCoord {
rootCoord = mr.runRootCoord(ctx, local, &wg)
componentMap[typeutil.RootCoordRole] = rootCoord
}
if mr.EnableDataCoord {
dataCoord = mr.runDataCoord(ctx, local, &wg)
componentMap[typeutil.DataCoordRole] = dataCoord
}
if mr.EnableIndexCoord {
indexCoord = mr.runIndexCoord(ctx, local, &wg)
componentMap[typeutil.IndexCoordRole] = indexCoord
}
if mr.EnableQueryCoord {
queryCoord = mr.runQueryCoord(ctx, local, &wg)
componentMap[typeutil.QueryCoordRole] = queryCoord
}
if mr.EnableQueryNode {
queryNode = mr.runQueryNode(ctx, local, &wg)
componentMap[typeutil.QueryNodeRole] = queryNode
}
if mr.EnableDataNode {
dataNode = mr.runDataNode(ctx, local, &wg)
componentMap[typeutil.DataNodeRole] = dataNode
}
if mr.EnableIndexNode {
indexNode = mr.runIndexNode(ctx, local, &wg)
componentMap[typeutil.IndexNodeRole] = indexNode
}
if mr.EnableProxy {
proxy = mr.runProxy(ctx, local, &wg)
componentMap[typeutil.ProxyRole] = proxy
}
wg.Wait()
http.RegisterStopComponent(func(role string) error {
if len(role) == 0 || componentMap[role] == nil {
return fmt.Errorf("stop component [%s] in [%s] is not supported", role, mr.ServerType)
}
return componentMap[role].Stop()
})
http.RegisterCheckComponentReady(func(role string) error {
if len(role) == 0 || componentMap[role] == nil {
return fmt.Errorf("check component state for [%s] in [%s] is not supported", role, mr.ServerType)
}
// for coord component, if it's in standby state, it will return StateCode_StandBy
code := componentMap[role].Health(context.TODO())
if code != commonpb.StateCode_Healthy {
return fmt.Errorf("component [%s] in [%s] is not healthy", role, mr.ServerType)
}
return nil
})
mr.setupLogger()
tracer.Init()
paramtable.Get().WatchKeyPrefix("trace", config.NewHandler("tracing handler", func(e *config.Event) {

View File

@ -163,7 +163,7 @@ func (s *Server) registerHTTPServer() {
apiv1 := metricsGinHandler.Group(apiPathPrefix)
httpserver.NewHandlers(s.proxy).RegisterRoutesTo(apiv1)
management.Register(&management.Handler{
Path: "/",
Path: management.RootPath,
HandlerFunc: nil,
Handler: metricsGinHandler.Handler(),
})

View File

@ -27,3 +27,34 @@ const EventLogRouterPath = "/eventlog"
// ExprPath is path for expression.
const ExprPath = "/expr"
const RootPath = "/"
// Prometheus restful api path
const (
MetricsPath = "/metrics"
MetricsDefaultPath = "/metrics_default"
)
// for every component, register it's own api to trigger stop and check ready
const (
RouteTriggerStopPath = "/management/stop"
RouteCheckComponentReady = "/management/check/ready"
)
// proxy management restful api root path
const (
RouteGcPause = "/management/datacoord/garbage_collection/pause"
RouteGcResume = "/management/datacoord/garbage_collection/resume"
RouteSuspendQueryCoordBalance = "/management/querycoord/balance/suspend"
RouteResumeQueryCoordBalance = "/management/querycoord/balance/resume"
RouteTransferSegment = "/management/querycoord/transfer/segment"
RouteTransferChannel = "/management/querycoord/transfer/channel"
RouteSuspendQueryNode = "/management/querycoord/node/suspend"
RouteResumeQueryNode = "/management/querycoord/node/resume"
RouteListQueryNode = "/management/querycoord/node/list"
RouteGetQueryNodeDistribution = "/management/querycoord/distribution/get"
RouteCheckQueryNodeDistribution = "/management/querycoord/distribution/check"
)

View File

@ -88,6 +88,46 @@ func registerDefaults() {
})
}
func RegisterStopComponent(triggerComponentStop func(role string) error) {
// register restful api to trigger stop
Register(&Handler{
Path: RouteTriggerStopPath,
HandlerFunc: func(w http.ResponseWriter, req *http.Request) {
role := req.URL.Query().Get("role")
log.Info("start to trigger component stop", zap.String("role", role))
if err := triggerComponentStop(role); err != nil {
log.Warn("failed to trigger component stop", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to trigger component stop, %s"}`, err.Error())))
return
}
log.Info("finish to trigger component stop", zap.String("role", role))
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"msg": "OK"}`))
},
})
}
func RegisterCheckComponentReady(checkActive func(role string) error) {
// register restful api to check component ready
Register(&Handler{
Path: RouteCheckComponentReady,
HandlerFunc: func(w http.ResponseWriter, req *http.Request) {
role := req.URL.Query().Get("role")
log.Info("start to check component ready", zap.String("role", role))
if err := checkActive(role); err != nil {
log.Warn("failed to check component ready", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to to check component ready, %s"}`, err.Error())))
return
}
log.Info("finish to check component ready", zap.String("role", role))
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"msg": "OK"}`))
},
})
}
func Register(h *Handler) {
if metricsServer == nil {
if paramtable.Get().HTTPCfg.EnablePprof.GetAsBool() {

View File

@ -32,69 +32,52 @@ import (
)
// this file contains proxy management restful API handler
const (
mgrRouteGcPause = `/management/datacoord/garbage_collection/pause`
mgrRouteGcResume = `/management/datacoord/garbage_collection/resume`
mgrSuspendQueryCoordBalance = `/management/querycoord/balance/suspend`
mgrResumeQueryCoordBalance = `/management/querycoord/balance/resume`
mgrTransferSegment = `/management/querycoord/transfer/segment`
mgrTransferChannel = `/management/querycoord/transfer/channel`
mgrSuspendQueryNode = `/management/querycoord/node/suspend`
mgrResumeQueryNode = `/management/querycoord/node/resume`
mgrListQueryNode = `/management/querycoord/node/list`
mgrGetQueryNodeDistribution = `/management/querycoord/distribution/get`
mgrCheckQueryNodeDistribution = `/management/querycoord/distribution/check`
)
var mgrRouteRegisterOnce sync.Once
func RegisterMgrRoute(proxy *Proxy) {
mgrRouteRegisterOnce.Do(func() {
management.Register(&management.Handler{
Path: mgrRouteGcPause,
Path: management.RouteGcPause,
HandlerFunc: proxy.PauseDatacoordGC,
})
management.Register(&management.Handler{
Path: mgrRouteGcResume,
Path: management.RouteGcResume,
HandlerFunc: proxy.ResumeDatacoordGC,
})
management.Register(&management.Handler{
Path: mgrListQueryNode,
Path: management.RouteListQueryNode,
HandlerFunc: proxy.ListQueryNode,
})
management.Register(&management.Handler{
Path: mgrGetQueryNodeDistribution,
Path: management.RouteGetQueryNodeDistribution,
HandlerFunc: proxy.GetQueryNodeDistribution,
})
management.Register(&management.Handler{
Path: mgrSuspendQueryCoordBalance,
Path: management.RouteSuspendQueryCoordBalance,
HandlerFunc: proxy.SuspendQueryCoordBalance,
})
management.Register(&management.Handler{
Path: mgrResumeQueryCoordBalance,
Path: management.RouteResumeQueryCoordBalance,
HandlerFunc: proxy.ResumeQueryCoordBalance,
})
management.Register(&management.Handler{
Path: mgrSuspendQueryNode,
Path: management.RouteSuspendQueryNode,
HandlerFunc: proxy.SuspendQueryNode,
})
management.Register(&management.Handler{
Path: mgrResumeQueryNode,
Path: management.RouteResumeQueryNode,
HandlerFunc: proxy.ResumeQueryNode,
})
management.Register(&management.Handler{
Path: mgrTransferSegment,
Path: management.RouteTransferSegment,
HandlerFunc: proxy.TransferSegment,
})
management.Register(&management.Handler{
Path: mgrTransferChannel,
Path: management.RouteTransferChannel,
HandlerFunc: proxy.TransferChannel,
})
management.Register(&management.Handler{
Path: mgrCheckQueryNodeDistribution,
Path: management.RouteCheckQueryNodeDistribution,
HandlerFunc: proxy.CheckQueryNodeDistribution,
})
})

View File

@ -29,6 +29,7 @@ import (
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
management "github.com/milvus-io/milvus/internal/http"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
@ -66,7 +67,7 @@ func (s *ProxyManagementSuite) TestPauseDataCoordGC() {
return &commonpb.Status{}, nil
})
req, err := http.NewRequest(http.MethodGet, mgrRouteGcPause+"?pause_seconds=60", nil)
req, err := http.NewRequest(http.MethodGet, management.RouteGcPause+"?pause_seconds=60", nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
@ -82,7 +83,7 @@ func (s *ProxyManagementSuite) TestPauseDataCoordGC() {
return &commonpb.Status{}, errors.New("mock")
})
req, err := http.NewRequest(http.MethodGet, mgrRouteGcPause+"?pause_seconds=60", nil)
req, err := http.NewRequest(http.MethodGet, management.RouteGcPause+"?pause_seconds=60", nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
@ -101,7 +102,7 @@ func (s *ProxyManagementSuite) TestPauseDataCoordGC() {
}, nil
})
req, err := http.NewRequest(http.MethodGet, mgrRouteGcPause+"?pause_seconds=60", nil)
req, err := http.NewRequest(http.MethodGet, management.RouteGcPause+"?pause_seconds=60", nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
@ -120,7 +121,7 @@ func (s *ProxyManagementSuite) TestResumeDatacoordGC() {
return &commonpb.Status{}, nil
})
req, err := http.NewRequest(http.MethodGet, mgrRouteGcResume, nil)
req, err := http.NewRequest(http.MethodGet, management.RouteGcResume, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
@ -136,7 +137,7 @@ func (s *ProxyManagementSuite) TestResumeDatacoordGC() {
return &commonpb.Status{}, errors.New("mock")
})
req, err := http.NewRequest(http.MethodGet, mgrRouteGcResume, nil)
req, err := http.NewRequest(http.MethodGet, management.RouteGcResume, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
@ -155,7 +156,7 @@ func (s *ProxyManagementSuite) TestResumeDatacoordGC() {
}, nil
})
req, err := http.NewRequest(http.MethodGet, mgrRouteGcResume, nil)
req, err := http.NewRequest(http.MethodGet, management.RouteGcResume, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
@ -181,7 +182,7 @@ func (s *ProxyManagementSuite) TestListQueryNode() {
},
}, nil)
req, err := http.NewRequest(http.MethodPost, mgrListQueryNode, nil)
req, err := http.NewRequest(http.MethodPost, management.RouteListQueryNode, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
@ -195,7 +196,7 @@ func (s *ProxyManagementSuite) TestListQueryNode() {
defer s.TearDownTest()
s.querycoord.EXPECT().ListQueryNode(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error"))
req, err := http.NewRequest(http.MethodPost, mgrListQueryNode, nil)
req, err := http.NewRequest(http.MethodPost, management.RouteListQueryNode, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
@ -211,7 +212,7 @@ func (s *ProxyManagementSuite) TestListQueryNode() {
Status: merr.Status(merr.ErrServiceNotReady),
}, nil)
req, err := http.NewRequest(http.MethodPost, mgrListQueryNode, nil)
req, err := http.NewRequest(http.MethodPost, management.RouteListQueryNode, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
@ -232,7 +233,7 @@ func (s *ProxyManagementSuite) TestGetQueryNodeDistribution() {
SealedSegmentIDs: []int64{1, 2, 3},
}, nil)
req, err := http.NewRequest(http.MethodPost, mgrGetQueryNodeDistribution, strings.NewReader("node_id=1"))
req, err := http.NewRequest(http.MethodPost, management.RouteGetQueryNodeDistribution, strings.NewReader("node_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
@ -247,14 +248,14 @@ func (s *ProxyManagementSuite) TestGetQueryNodeDistribution() {
defer s.TearDownTest()
// test invalid request body
req, err := http.NewRequest(http.MethodPost, mgrGetQueryNodeDistribution, nil)
req, err := http.NewRequest(http.MethodPost, management.RouteGetQueryNodeDistribution, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
s.proxy.GetQueryNodeDistribution(recorder, req)
s.Equal(http.StatusBadRequest, recorder.Code)
// test miss requested param
req, err = http.NewRequest(http.MethodPost, mgrGetQueryNodeDistribution, strings.NewReader(""))
req, err = http.NewRequest(http.MethodPost, management.RouteGetQueryNodeDistribution, strings.NewReader(""))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder = httptest.NewRecorder()
@ -263,7 +264,7 @@ func (s *ProxyManagementSuite) TestGetQueryNodeDistribution() {
// test rpc return error
s.querycoord.EXPECT().GetQueryNodeDistribution(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error"))
req, err = http.NewRequest(http.MethodPost, mgrGetQueryNodeDistribution, strings.NewReader("node_id=1"))
req, err = http.NewRequest(http.MethodPost, management.RouteGetQueryNodeDistribution, strings.NewReader("node_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder = httptest.NewRecorder()
@ -276,7 +277,7 @@ func (s *ProxyManagementSuite) TestGetQueryNodeDistribution() {
defer s.TearDownTest()
s.querycoord.EXPECT().GetQueryNodeDistribution(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error"))
req, err := http.NewRequest(http.MethodPost, mgrGetQueryNodeDistribution, strings.NewReader("node_id=1"))
req, err := http.NewRequest(http.MethodPost, management.RouteGetQueryNodeDistribution, strings.NewReader("node_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder := httptest.NewRecorder()
@ -292,7 +293,7 @@ func (s *ProxyManagementSuite) TestSuspendQueryCoordBalance() {
s.querycoord.EXPECT().SuspendBalance(mock.Anything, mock.Anything).Return(merr.Success(), nil)
req, err := http.NewRequest(http.MethodPost, mgrSuspendQueryCoordBalance, nil)
req, err := http.NewRequest(http.MethodPost, management.RouteSuspendQueryCoordBalance, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
@ -306,7 +307,7 @@ func (s *ProxyManagementSuite) TestSuspendQueryCoordBalance() {
defer s.TearDownTest()
s.querycoord.EXPECT().SuspendBalance(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error"))
req, err := http.NewRequest(http.MethodPost, mgrSuspendQueryCoordBalance, nil)
req, err := http.NewRequest(http.MethodPost, management.RouteSuspendQueryCoordBalance, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
@ -319,7 +320,7 @@ func (s *ProxyManagementSuite) TestSuspendQueryCoordBalance() {
defer s.TearDownTest()
s.querycoord.EXPECT().SuspendBalance(mock.Anything, mock.Anything).Return(merr.Status(merr.ErrServiceNotReady), nil)
req, err := http.NewRequest(http.MethodPost, mgrSuspendQueryCoordBalance, nil)
req, err := http.NewRequest(http.MethodPost, management.RouteSuspendQueryCoordBalance, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
@ -335,7 +336,7 @@ func (s *ProxyManagementSuite) TestResumeQueryCoordBalance() {
s.querycoord.EXPECT().ResumeBalance(mock.Anything, mock.Anything).Return(merr.Success(), nil)
req, err := http.NewRequest(http.MethodPost, mgrResumeQueryCoordBalance, nil)
req, err := http.NewRequest(http.MethodPost, management.RouteResumeQueryCoordBalance, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
@ -349,7 +350,7 @@ func (s *ProxyManagementSuite) TestResumeQueryCoordBalance() {
defer s.TearDownTest()
s.querycoord.EXPECT().ResumeBalance(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error"))
req, err := http.NewRequest(http.MethodPost, mgrResumeQueryCoordBalance, nil)
req, err := http.NewRequest(http.MethodPost, management.RouteResumeQueryCoordBalance, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
@ -362,7 +363,7 @@ func (s *ProxyManagementSuite) TestResumeQueryCoordBalance() {
defer s.TearDownTest()
s.querycoord.EXPECT().ResumeBalance(mock.Anything, mock.Anything).Return(merr.Status(merr.ErrServiceNotReady), nil)
req, err := http.NewRequest(http.MethodPost, mgrResumeQueryCoordBalance, nil)
req, err := http.NewRequest(http.MethodPost, management.RouteResumeQueryCoordBalance, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
@ -378,7 +379,7 @@ func (s *ProxyManagementSuite) TestSuspendQueryNode() {
s.querycoord.EXPECT().SuspendNode(mock.Anything, mock.Anything).Return(merr.Success(), nil)
req, err := http.NewRequest(http.MethodPost, mgrSuspendQueryNode, strings.NewReader("node_id=1"))
req, err := http.NewRequest(http.MethodPost, management.RouteSuspendQueryNode, strings.NewReader("node_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
@ -393,14 +394,14 @@ func (s *ProxyManagementSuite) TestSuspendQueryNode() {
defer s.TearDownTest()
// test invalid request body
req, err := http.NewRequest(http.MethodPost, mgrSuspendQueryNode, nil)
req, err := http.NewRequest(http.MethodPost, management.RouteSuspendQueryNode, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
s.proxy.SuspendQueryNode(recorder, req)
s.Equal(http.StatusBadRequest, recorder.Code)
// test miss requested param
req, err = http.NewRequest(http.MethodPost, mgrSuspendQueryNode, strings.NewReader(""))
req, err = http.NewRequest(http.MethodPost, management.RouteSuspendQueryNode, strings.NewReader(""))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder = httptest.NewRecorder()
@ -409,7 +410,7 @@ func (s *ProxyManagementSuite) TestSuspendQueryNode() {
// test rpc return error
s.querycoord.EXPECT().SuspendNode(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error"))
req, err = http.NewRequest(http.MethodPost, mgrSuspendQueryNode, strings.NewReader("node_id=1"))
req, err = http.NewRequest(http.MethodPost, management.RouteSuspendQueryNode, strings.NewReader("node_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder = httptest.NewRecorder()
@ -422,7 +423,7 @@ func (s *ProxyManagementSuite) TestSuspendQueryNode() {
defer s.TearDownTest()
s.querycoord.EXPECT().SuspendNode(mock.Anything, mock.Anything).Return(merr.Status(merr.ErrServiceNotReady), nil)
req, err := http.NewRequest(http.MethodPost, mgrSuspendQueryNode, strings.NewReader("node_id=1"))
req, err := http.NewRequest(http.MethodPost, management.RouteSuspendQueryNode, strings.NewReader("node_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder := httptest.NewRecorder()
@ -438,7 +439,7 @@ func (s *ProxyManagementSuite) TestResumeQueryNode() {
s.querycoord.EXPECT().ResumeNode(mock.Anything, mock.Anything).Return(merr.Success(), nil)
req, err := http.NewRequest(http.MethodPost, mgrResumeQueryNode, strings.NewReader("node_id=1"))
req, err := http.NewRequest(http.MethodPost, management.RouteResumeQueryNode, strings.NewReader("node_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
@ -453,14 +454,14 @@ func (s *ProxyManagementSuite) TestResumeQueryNode() {
defer s.TearDownTest()
// test invalid request body
req, err := http.NewRequest(http.MethodPost, mgrResumeQueryNode, nil)
req, err := http.NewRequest(http.MethodPost, management.RouteResumeQueryNode, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
s.proxy.ResumeQueryNode(recorder, req)
s.Equal(http.StatusBadRequest, recorder.Code)
// test miss requested param
req, err = http.NewRequest(http.MethodPost, mgrResumeQueryNode, strings.NewReader(""))
req, err = http.NewRequest(http.MethodPost, management.RouteResumeQueryNode, strings.NewReader(""))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder = httptest.NewRecorder()
@ -469,7 +470,7 @@ func (s *ProxyManagementSuite) TestResumeQueryNode() {
// test rpc return error
s.querycoord.EXPECT().ResumeNode(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error"))
req, err = http.NewRequest(http.MethodPost, mgrResumeQueryNode, strings.NewReader("node_id=1"))
req, err = http.NewRequest(http.MethodPost, management.RouteResumeQueryNode, strings.NewReader("node_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder = httptest.NewRecorder()
@ -482,7 +483,7 @@ func (s *ProxyManagementSuite) TestResumeQueryNode() {
defer s.TearDownTest()
s.querycoord.EXPECT().ResumeNode(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error"))
req, err := http.NewRequest(http.MethodPost, mgrResumeQueryNode, strings.NewReader("node_id=1"))
req, err := http.NewRequest(http.MethodPost, management.RouteResumeQueryNode, strings.NewReader("node_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder := httptest.NewRecorder()
@ -498,7 +499,7 @@ func (s *ProxyManagementSuite) TestTransferSegment() {
s.querycoord.EXPECT().TransferSegment(mock.Anything, mock.Anything).Return(merr.Success(), nil)
req, err := http.NewRequest(http.MethodPost, mgrTransferSegment, strings.NewReader("source_node_id=1&target_node_id=1&segment_id=1&copy_mode=false"))
req, err := http.NewRequest(http.MethodPost, management.RouteTransferSegment, strings.NewReader("source_node_id=1&target_node_id=1&segment_id=1&copy_mode=false"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder := httptest.NewRecorder()
@ -507,7 +508,7 @@ func (s *ProxyManagementSuite) TestTransferSegment() {
s.Equal(`{"msg": "OK"}`, recorder.Body.String())
// test use default param
req, err = http.NewRequest(http.MethodPost, mgrTransferSegment, strings.NewReader("source_node_id=1"))
req, err = http.NewRequest(http.MethodPost, management.RouteTransferSegment, strings.NewReader("source_node_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder = httptest.NewRecorder()
@ -521,14 +522,14 @@ func (s *ProxyManagementSuite) TestTransferSegment() {
defer s.TearDownTest()
// test invalid request body
req, err := http.NewRequest(http.MethodPost, mgrTransferSegment, nil)
req, err := http.NewRequest(http.MethodPost, management.RouteTransferSegment, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
s.proxy.TransferSegment(recorder, req)
s.Equal(http.StatusBadRequest, recorder.Code)
// test miss requested param
req, err = http.NewRequest(http.MethodPost, mgrTransferSegment, strings.NewReader(""))
req, err = http.NewRequest(http.MethodPost, management.RouteTransferSegment, strings.NewReader(""))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder = httptest.NewRecorder()
@ -537,7 +538,7 @@ func (s *ProxyManagementSuite) TestTransferSegment() {
// test rpc return error
s.querycoord.EXPECT().TransferSegment(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error"))
req, err = http.NewRequest(http.MethodPost, mgrTransferSegment, strings.NewReader("source_node_id=1&target_node_id=1&segment_id=1"))
req, err = http.NewRequest(http.MethodPost, management.RouteTransferSegment, strings.NewReader("source_node_id=1&target_node_id=1&segment_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder = httptest.NewRecorder()
@ -550,7 +551,7 @@ func (s *ProxyManagementSuite) TestTransferSegment() {
defer s.TearDownTest()
s.querycoord.EXPECT().TransferSegment(mock.Anything, mock.Anything).Return(merr.Status(merr.ErrServiceNotReady), nil)
req, err := http.NewRequest(http.MethodPost, mgrTransferSegment, strings.NewReader("source_node_id=1"))
req, err := http.NewRequest(http.MethodPost, management.RouteTransferSegment, strings.NewReader("source_node_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder := httptest.NewRecorder()
@ -566,7 +567,7 @@ func (s *ProxyManagementSuite) TestTransferChannel() {
s.querycoord.EXPECT().TransferChannel(mock.Anything, mock.Anything).Return(merr.Success(), nil)
req, err := http.NewRequest(http.MethodPost, mgrTransferChannel, strings.NewReader("source_node_id=1&target_node_id=1&segment_id=1&copy_mode=false"))
req, err := http.NewRequest(http.MethodPost, management.RouteTransferChannel, strings.NewReader("source_node_id=1&target_node_id=1&segment_id=1&copy_mode=false"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder := httptest.NewRecorder()
@ -575,7 +576,7 @@ func (s *ProxyManagementSuite) TestTransferChannel() {
s.Equal(`{"msg": "OK"}`, recorder.Body.String())
// test use default param
req, err = http.NewRequest(http.MethodPost, mgrTransferChannel, strings.NewReader("source_node_id=1"))
req, err = http.NewRequest(http.MethodPost, management.RouteTransferChannel, strings.NewReader("source_node_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder = httptest.NewRecorder()
@ -589,14 +590,14 @@ func (s *ProxyManagementSuite) TestTransferChannel() {
defer s.TearDownTest()
// test invalid request body
req, err := http.NewRequest(http.MethodPost, mgrTransferChannel, nil)
req, err := http.NewRequest(http.MethodPost, management.RouteTransferChannel, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
s.proxy.TransferChannel(recorder, req)
s.Equal(http.StatusBadRequest, recorder.Code)
// test miss requested param
req, err = http.NewRequest(http.MethodPost, mgrTransferChannel, strings.NewReader(""))
req, err = http.NewRequest(http.MethodPost, management.RouteTransferChannel, strings.NewReader(""))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder = httptest.NewRecorder()
@ -605,7 +606,7 @@ func (s *ProxyManagementSuite) TestTransferChannel() {
// test rpc return error
s.querycoord.EXPECT().TransferChannel(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error"))
req, err = http.NewRequest(http.MethodPost, mgrTransferChannel, strings.NewReader("source_node_id=1&target_node_id=1&segment_id=1"))
req, err = http.NewRequest(http.MethodPost, management.RouteTransferChannel, strings.NewReader("source_node_id=1&target_node_id=1&segment_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder = httptest.NewRecorder()
@ -618,7 +619,7 @@ func (s *ProxyManagementSuite) TestTransferChannel() {
defer s.TearDownTest()
s.querycoord.EXPECT().TransferChannel(mock.Anything, mock.Anything).Return(merr.Status(merr.ErrServiceNotReady), nil)
req, err := http.NewRequest(http.MethodPost, mgrTransferChannel, strings.NewReader("source_node_id=1"))
req, err := http.NewRequest(http.MethodPost, management.RouteTransferChannel, strings.NewReader("source_node_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder := httptest.NewRecorder()
@ -634,7 +635,7 @@ func (s *ProxyManagementSuite) TestCheckQueryNodeDistribution() {
s.querycoord.EXPECT().CheckQueryNodeDistribution(mock.Anything, mock.Anything).Return(merr.Success(), nil)
req, err := http.NewRequest(http.MethodPost, mgrCheckQueryNodeDistribution, strings.NewReader("source_node_id=1&target_node_id=1"))
req, err := http.NewRequest(http.MethodPost, management.RouteCheckQueryNodeDistribution, strings.NewReader("source_node_id=1&target_node_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder := httptest.NewRecorder()
@ -648,14 +649,14 @@ func (s *ProxyManagementSuite) TestCheckQueryNodeDistribution() {
defer s.TearDownTest()
// test invalid request body
req, err := http.NewRequest(http.MethodPost, mgrCheckQueryNodeDistribution, nil)
req, err := http.NewRequest(http.MethodPost, management.RouteCheckQueryNodeDistribution, nil)
s.Require().NoError(err)
recorder := httptest.NewRecorder()
s.proxy.CheckQueryNodeDistribution(recorder, req)
s.Equal(http.StatusBadRequest, recorder.Code)
// test miss requested param
req, err = http.NewRequest(http.MethodPost, mgrCheckQueryNodeDistribution, strings.NewReader(""))
req, err = http.NewRequest(http.MethodPost, management.RouteCheckQueryNodeDistribution, strings.NewReader(""))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder = httptest.NewRecorder()
@ -664,7 +665,7 @@ func (s *ProxyManagementSuite) TestCheckQueryNodeDistribution() {
// test rpc return error
s.querycoord.EXPECT().CheckQueryNodeDistribution(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error"))
req, err = http.NewRequest(http.MethodPost, mgrCheckQueryNodeDistribution, strings.NewReader("source_node_id=1&target_node_id=1"))
req, err = http.NewRequest(http.MethodPost, management.RouteCheckQueryNodeDistribution, strings.NewReader("source_node_id=1&target_node_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder = httptest.NewRecorder()
@ -677,7 +678,7 @@ func (s *ProxyManagementSuite) TestCheckQueryNodeDistribution() {
defer s.TearDownTest()
s.querycoord.EXPECT().CheckQueryNodeDistribution(mock.Anything, mock.Anything).Return(merr.Status(merr.ErrServiceNotReady), nil)
req, err := http.NewRequest(http.MethodPost, mgrCheckQueryNodeDistribution, strings.NewReader("source_node_id=1&target_node_id=1"))
req, err := http.NewRequest(http.MethodPost, management.RouteCheckQueryNodeDistribution, strings.NewReader("source_node_id=1&target_node_id=1"))
s.Require().NoError(err)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
recorder := httptest.NewRecorder()