From 83fdc1da7c46a0858319768315bd7d51920ad96a Mon Sep 17 00:00:00 2001 From: shaoyue Date: Wed, 23 Feb 2022 14:37:52 +0800 Subject: [PATCH] Add web server in proxy with sample handler (#15647) Signed-off-by: shaoyue.chen --- configs/milvus.yaml | 6 ++ go.mod | 1 + go.sum | 21 +++++ .../distributed/proxy/httpserver/handler.go | 41 +++++++++ .../proxy/httpserver/handler_test.go | 89 +++++++++++++++++++ .../distributed/proxy/httpserver/wrapper.go | 53 +++++++++++ .../proxy/httpserver/wrapper_test.go | 58 ++++++++++++ internal/distributed/proxy/service.go | 61 +++++++++++-- internal/distributed/proxy/service_test.go | 22 +++++ internal/util/paramtable/http_param.go | 50 +++++++++++ internal/util/paramtable/http_param_test.go | 17 ++++ 11 files changed, 414 insertions(+), 5 deletions(-) create mode 100644 internal/distributed/proxy/httpserver/handler.go create mode 100644 internal/distributed/proxy/httpserver/handler_test.go create mode 100644 internal/distributed/proxy/httpserver/wrapper.go create mode 100644 internal/distributed/proxy/httpserver/wrapper_test.go create mode 100644 internal/util/paramtable/http_param.go create mode 100644 internal/util/paramtable/http_param_test.go diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 73169a5d15..b1f0e02ffa 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -63,6 +63,12 @@ rootCoord: proxy: port: 19530 + http: + enabled: true # Whether to enable the http server + port: 8080 # Whether to enable the http server + readTimeout: 30000 # 30000 ms http read timeout + writeTimeout: 30000 # 30000 ms http write timeout + grpc: serverMaxRecvSize: 536870912 # 512 MB, 512 * 1024 * 1024 Bytes serverMaxSendSize: 536870912 # 512 MB, 512 * 1024 * 1024 Bytes diff --git a/go.mod b/go.mod index bbab282d30..72efae2a0b 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect + github.com/gin-gonic/gin v1.7.7 github.com/go-basic/ipv4 v1.0.0 github.com/golang/mock v1.5.0 github.com/golang/protobuf v1.5.2 diff --git a/go.sum b/go.sum index f0ec138dac..d6ddd2030d 100644 --- a/go.sum +++ b/go.sum @@ -168,6 +168,10 @@ github.com/gdamore/tcell v1.3.0/go.mod h1:Hjvr+Ofd+gLglo7RYKxxnzCBmev3BzsS67MebK github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs= github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.7.7 h1:3DoBmSbJbZAWqXJC3SLjAPfutPJJRN1U5pALB7EeTTs= +github.com/gin-gonic/gin v1.7.7/go.mod h1:axIBovoeJpVj8S3BwE0uPMTeReE4+AfFtqpqaZ1qq1U= github.com/go-basic/ipv4 v1.0.0 h1:gjyFAa1USC1hhXTkPOwBWDPfMcUaIM+tvo1XzV9EZxs= github.com/go-basic/ipv4 v1.0.0/go.mod h1:etLBnaxbidQfuqE6wgZQfs38nEWNmzALkxDZe4xY8Dg= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= @@ -181,6 +185,13 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.4.1 h1:pH2c5ADXtd66mxoE0Zm9SUhxE20r7aM3F26W0hOn+GE= +github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0= @@ -317,6 +328,7 @@ github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9q github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -348,6 +360,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 h1:IVlcvV0CjvfBYYod5ePe89l+3LBAl//6n9kJ9Vr2i0k= github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76/go.mod h1:Iu9BHUvTh8/KpbuSoKx/CaJEdJvFxSverxIy7I+nq7s= github.com/linkedin/goavro/v2 v2.9.8 h1:jN50elxBsGBDGVDEKqUlDuU1cFwJ11K/yrJCBMe/7Wg= @@ -359,6 +373,8 @@ github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaW github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.8/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= @@ -533,6 +549,10 @@ github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24sz github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-lib v2.4.0+incompatible h1:fY7QsGQWiCt8pajv4r7JEvmATdCVaWxXbjwyYwsNaLQ= github.com/uber/jaeger-lib v2.4.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= +github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= @@ -747,6 +767,7 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/internal/distributed/proxy/httpserver/handler.go b/internal/distributed/proxy/httpserver/handler.go new file mode 100644 index 0000000000..90b46b4822 --- /dev/null +++ b/internal/distributed/proxy/httpserver/handler.go @@ -0,0 +1,41 @@ +package httpserver + +import ( + "fmt" + + "github.com/gin-gonic/gin" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/types" +) + +// Handlers handles http requests +type Handlers struct { + proxy types.ProxyComponent +} + +// NewHandlers creates a new Handlers +func NewHandlers(proxy types.ProxyComponent) *Handlers { + return &Handlers{ + proxy: proxy, + } +} + +// RegisterRouters registers routes to given router +func (h *Handlers) RegisterRoutesTo(router gin.IRouter) { + router.GET("/health", wrapHandler(h.handleGetHealth)) + router.POST("/dummy", wrapHandler(h.handlePostDummy)) +} + +func (h *Handlers) handleGetHealth(c *gin.Context) (interface{}, error) { + return gin.H{"status": "ok"}, nil +} + +func (h *Handlers) handlePostDummy(c *gin.Context) (interface{}, error) { + req := milvuspb.DummyRequest{} + // use ShouldBind to supports binding JSON, XML, YAML, and protobuf. + err := c.ShouldBind(&req) + if err != nil { + return nil, fmt.Errorf("%w: parse json failed: %v", errBadRequest, err) + } + return h.proxy.Dummy(c, &req) +} diff --git a/internal/distributed/proxy/httpserver/handler_test.go b/internal/distributed/proxy/httpserver/handler_test.go new file mode 100644 index 0000000000..cfff1da92e --- /dev/null +++ b/internal/distributed/proxy/httpserver/handler_test.go @@ -0,0 +1,89 @@ +package httpserver + +import ( + "bytes" + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/gin-gonic/gin" + "github.com/gin-gonic/gin/binding" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/types" + "github.com/stretchr/testify/assert" +) + +type mockProxyComponent struct { + // wrap the interface to avoid implement not used func. + // and to let not implemented call panics + // implement the method you want to mock + types.ProxyComponent +} + +func (mockProxyComponent) Dummy(ctx context.Context, request *milvuspb.DummyRequest) (*milvuspb.DummyResponse, error) { + return nil, nil +} + +func TestHandlers(t *testing.T) { + mockProxy := mockProxyComponent{} + h := NewHandlers(mockProxy) + testEngine := gin.New() + h.RegisterRoutesTo(testEngine) + + t.Run("handleGetHealth default json ok", func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, "/health", nil) + w := httptest.NewRecorder() + testEngine.ServeHTTP(w, req) + assert.Equal(t, http.StatusOK, w.Code) + assert.Equal(t, w.Body.Bytes(), []byte(`{"status":"ok"}`)) + }) + t.Run("handleGetHealth accept yaml ok", func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, "/health", nil) + req.Header = http.Header{ + "Accept": []string{binding.MIMEYAML}, + } + w := httptest.NewRecorder() + testEngine.ServeHTTP(w, req) + assert.Equal(t, http.StatusOK, w.Code) + assert.Equal(t, w.Body.Bytes(), []byte("status: ok\n")) + }) + t.Run("handlePostDummy parsejson failed 400", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/dummy", nil) + req.Header = http.Header{ + "Content-Type": []string{binding.MIMEJSON}, + } + w := httptest.NewRecorder() + testEngine.ServeHTTP(w, req) + assert.Equal(t, http.StatusBadRequest, w.Code) + }) + t.Run("handlePostDummy parseyaml failed 400", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/dummy", nil) + req.Header = http.Header{ + "Content-Type": []string{binding.MIMEYAML}, + } + w := httptest.NewRecorder() + testEngine.ServeHTTP(w, req) + assert.Equal(t, http.StatusBadRequest, w.Code) + }) + t.Run("handlePostDummy default json ok", func(t *testing.T) { + bodyBytes := []byte("{}") + req := httptest.NewRequest(http.MethodPost, "/dummy", bytes.NewReader(bodyBytes)) + req.Header = http.Header{ + "Content-Type": []string{binding.MIMEJSON}, + } + w := httptest.NewRecorder() + testEngine.ServeHTTP(w, req) + assert.Equal(t, http.StatusOK, w.Code) + }) + t.Run("handlePostDummy yaml ok", func(t *testing.T) { + bodyBytes := []byte("---") + req := httptest.NewRequest(http.MethodPost, "/dummy", bytes.NewReader(bodyBytes)) + req.Header = http.Header{ + "Content-Type": []string{binding.MIMEYAML}, + } + w := httptest.NewRecorder() + testEngine.ServeHTTP(w, req) + assert.Equal(t, http.StatusOK, w.Code) + }) +} diff --git a/internal/distributed/proxy/httpserver/wrapper.go b/internal/distributed/proxy/httpserver/wrapper.go new file mode 100644 index 0000000000..3b83247c9b --- /dev/null +++ b/internal/distributed/proxy/httpserver/wrapper.go @@ -0,0 +1,53 @@ +package httpserver + +import ( + "errors" + "net/http" + + "github.com/gin-gonic/gin" + "github.com/gin-gonic/gin/binding" + "github.com/milvus-io/milvus/internal/proto/commonpb" +) + +var ( + errBadRequest = errors.New("bad request") +) + +// handlerFunc handles http request with gin context +type handlerFunc func(c *gin.Context) (interface{}, error) + +// ErrResponse of server +type ErrResponse = commonpb.Status + +// wrapHandler wraps a handlerFunc into a gin.HandlerFunc +func wrapHandler(handle handlerFunc) gin.HandlerFunc { + return func(c *gin.Context) { + data, err := handle(c) + // format body by accept header, protobuf marshal not supported by gin by default + // TODO: add marshal handler to support protobuf response + formatOffered := []string{binding.MIMEJSON, binding.MIMEYAML, binding.MIMEXML} + bodyFormatNegotiate := gin.Negotiate{ + Offered: formatOffered, + Data: data, + } + if err != nil { + switch { + case errors.Is(err, errBadRequest): + bodyFormatNegotiate.Data = ErrResponse{ + ErrorCode: commonpb.ErrorCode_IllegalArgument, + Reason: err.Error(), + } + c.Negotiate(http.StatusBadRequest, bodyFormatNegotiate) + return + default: + bodyFormatNegotiate.Data = ErrResponse{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: err.Error(), + } + c.Negotiate(http.StatusInternalServerError, bodyFormatNegotiate) + return + } + } + c.Negotiate(http.StatusOK, bodyFormatNegotiate) + } +} diff --git a/internal/distributed/proxy/httpserver/wrapper_test.go b/internal/distributed/proxy/httpserver/wrapper_test.go new file mode 100644 index 0000000000..42d17fca36 --- /dev/null +++ b/internal/distributed/proxy/httpserver/wrapper_test.go @@ -0,0 +1,58 @@ +package httpserver + +import ( + "errors" + "net/http" + "net/http/httptest" + "testing" + + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/assert" +) + +func TestWrapHandler(t *testing.T) { + var testWrapFunc = func(c *gin.Context) (interface{}, error) { + Case := c.Param("case") + switch Case { + case "0": + return gin.H{"status": "ok"}, nil + case "1": + return nil, errBadRequest + case "2": + return nil, errors.New("internal err") + } + panic("shall not reach") + } + wrappedHandler := wrapHandler(testWrapFunc) + testEngine := gin.New() + testEngine.GET("/test/:case", wrappedHandler) + + t.Run("status ok", func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, "/test/0", nil) + w := httptest.NewRecorder() + testEngine.ServeHTTP(w, req) + assert.Equal(t, http.StatusOK, w.Code) + }) + + t.Run("err notfound", func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, "/test", nil) + w := httptest.NewRecorder() + testEngine.ServeHTTP(w, req) + assert.Equal(t, http.StatusNotFound, w.Code) + }) + + t.Run("err bad request", func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, "/test/1", nil) + w := httptest.NewRecorder() + testEngine.ServeHTTP(w, req) + assert.Equal(t, http.StatusBadRequest, w.Code) + }) + + t.Run("err internal", func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, "/test/2", nil) + w := httptest.NewRecorder() + testEngine.ServeHTTP(w, req) + assert.Equal(t, http.StatusInternalServerError, w.Code) + }) + +} diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index bbf599b695..65becfad4d 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -21,13 +21,16 @@ import ( "fmt" "io" "net" + "net/http" "strconv" "sync" "time" + "github.com/gin-gonic/gin" ot "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" dcc "github.com/milvus-io/milvus/internal/distributed/datacoord/client" icc "github.com/milvus-io/milvus/internal/distributed/indexcoord/client" + "github.com/milvus-io/milvus/internal/distributed/proxy/httpserver" qcc "github.com/milvus-io/milvus/internal/distributed/querycoord/client" rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client" "github.com/milvus-io/milvus/internal/log" @@ -51,6 +54,7 @@ import ( ) var Params paramtable.GrpcServerConfig +var HTTPParams paramtable.HTTPConfig // Server is the Proxy Server type Server struct { @@ -58,6 +62,9 @@ type Server struct { wg sync.WaitGroup proxy types.ProxyComponent grpcServer *grpc.Server + httpServer *http.Server + // avoid race + httpServerMtx sync.Mutex grpcErrChan chan error @@ -87,6 +94,27 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) return server, err } +// startHTTPServer starts the http server, panic when failed +func (s *Server) startHTTPServer(port int) { + defer s.wg.Done() + ginHandler := gin.Default() + apiv1 := ginHandler.Group("/api/v1") + httpserver.NewHandlers(s.proxy).RegisterRoutesTo(apiv1) + s.httpServerMtx.Lock() + s.httpServer = &http.Server{ + Addr: fmt.Sprintf(":%d", port), + Handler: ginHandler, + ReadTimeout: HTTPParams.ReadTimeout, + WriteTimeout: HTTPParams.WriteTimeout, + } + s.httpServerMtx.Unlock() + if err := s.httpServer.ListenAndServe(); err != nil { + if err != http.ErrServerClosed { + panic("failed to start http server: " + err.Error()) + } + } +} + func (s *Server) startGrpcLoop(grpcPort int) { defer s.wg.Done() @@ -159,6 +187,8 @@ func (s *Server) Run() error { func (s *Server) init() error { Params.InitOnce(typeutil.ProxyRole) log.Debug("Proxy init service's parameter table done") + HTTPParams.InitOnce() + log.Debug("Proxy init http server's parameter table done") if !funcutil.CheckPortAvailable(Params.Port) { Params.Port = funcutil.GetAvailablePort() @@ -188,7 +218,12 @@ func (s *Server) init() error { log.Warn("failed to start Proxy's grpc server", zap.Error(err)) return err } - log.Debug("grpc server of Proxy has been started") + log.Debug("grpc server of proxy has been started") + if HTTPParams.Enabled { + log.Info("start http server of proxy", zap.Int("port", HTTPParams.Port)) + s.wg.Add(1) + go s.startHTTPServer(HTTPParams.Port) + } if s.rootCoordClient == nil { var err error @@ -347,10 +382,26 @@ func (s *Server) Stop() error { defer s.etcdCli.Close() } - if s.grpcServer != nil { - log.Debug("Graceful stop grpc server...") - s.grpcServer.GracefulStop() - } + gracefulWg := sync.WaitGroup{} + gracefulWg.Add(1) + go func() { + defer gracefulWg.Done() + s.httpServerMtx.Lock() + defer s.httpServerMtx.Unlock() + if s.httpServer != nil { + log.Debug("Graceful stop http server...") + s.httpServer.Shutdown(context.TODO()) + } + }() + gracefulWg.Add(1) + go func() { + defer gracefulWg.Done() + if s.grpcServer != nil { + log.Debug("Graceful stop grpc server...") + s.grpcServer.GracefulStop() + } + }() + gracefulWg.Wait() err = s.proxy.Stop() if err != nil { diff --git a/internal/distributed/proxy/service_test.go b/internal/distributed/proxy/service_test.go index 5a8f215b03..e449d193df 100644 --- a/internal/distributed/proxy/service_test.go +++ b/internal/distributed/proxy/service_test.go @@ -880,3 +880,25 @@ func Test_NewServer(t *testing.T) { err = server.Stop() assert.Nil(t, err) } + +func Test_NewServer_HTTPServerDisabled(t *testing.T) { + ctx := context.Background() + server, err := NewServer(ctx, nil) + assert.NotNil(t, server) + assert.Nil(t, err) + + server.proxy = &MockProxy{} + server.rootCoordClient = &MockRootCoord{} + server.indexCoordClient = &MockIndexCoord{} + server.queryCoordClient = &MockQueryCoord{} + server.dataCoordClient = &MockDataCoord{} + + HTTPParams.InitOnce() + HTTPParams.Enabled = false + + err = server.Run() + assert.Nil(t, err) + assert.Nil(t, server.httpServer) + err = server.Stop() + assert.Nil(t, err) +} diff --git a/internal/util/paramtable/http_param.go b/internal/util/paramtable/http_param.go new file mode 100644 index 0000000000..533b68199c --- /dev/null +++ b/internal/util/paramtable/http_param.go @@ -0,0 +1,50 @@ +package paramtable + +import ( + "sync" + "time" +) + +type HTTPConfig struct { + BaseTable + + once sync.Once + Enabled bool + Port int + ReadTimeout time.Duration + WriteTimeout time.Duration +} + +// InitOnce initialize HTTPConfig +func (p *HTTPConfig) InitOnce() { + p.once.Do(func() { + p.init() + }) +} + +func (p *HTTPConfig) init() { + p.BaseTable.Init() + + p.initHTTPEnabled() + p.initHTTPPort() + p.initHTTPReadTimeout() + p.initHTTPWriteTimeout() +} + +func (p *HTTPConfig) initHTTPEnabled() { + p.Enabled = p.ParseBool("proxy.http.enabled", true) +} + +func (p *HTTPConfig) initHTTPPort() { + p.Port = p.ParseIntWithDefault("proxy.http.port", 8080) +} + +func (p *HTTPConfig) initHTTPReadTimeout() { + interval := p.ParseIntWithDefault("proxy.http.readTimeout", 30000) + p.ReadTimeout = time.Duration(interval) * time.Millisecond +} + +func (p *HTTPConfig) initHTTPWriteTimeout() { + interval := p.ParseIntWithDefault("proxy.http.writeTimeout", 30000) + p.WriteTimeout = time.Duration(interval) * time.Millisecond +} diff --git a/internal/util/paramtable/http_param_test.go b/internal/util/paramtable/http_param_test.go new file mode 100644 index 0000000000..32dde1165a --- /dev/null +++ b/internal/util/paramtable/http_param_test.go @@ -0,0 +1,17 @@ +package paramtable + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestHTTPConfig_Init(t *testing.T) { + cf := new(HTTPConfig) + cf.InitOnce() + assert.Equal(t, cf.Enabled, true) + assert.Equal(t, cf.Port, 8080) + assert.Equal(t, cf.ReadTimeout, time.Second*30) + assert.Equal(t, cf.WriteTimeout, time.Second*30) +}