mirror of https://github.com/milvus-io/milvus.git
Add grpc healthy check (#16050)
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>pull/16050/merge
parent
97b1ed7bca
commit
0bd07daae3
|
@ -50,6 +50,7 @@ import (
|
|||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
)
|
||||
|
||||
|
@ -157,7 +158,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
|
|||
grpc.StreamInterceptor(ot.StreamServerInterceptor(opts...)))
|
||||
proxypb.RegisterProxyServer(s.grpcServer, s)
|
||||
milvuspb.RegisterMilvusServiceServer(s.grpcServer, s)
|
||||
|
||||
grpc_health_v1.RegisterHealthServer(s.grpcServer, s)
|
||||
log.Debug("create Proxy grpc server",
|
||||
zap.Any("enforcement policy", kaep),
|
||||
zap.Any("server parameters", kasp))
|
||||
|
@ -645,3 +646,41 @@ func (s *Server) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milv
|
|||
func (s *Server) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
|
||||
return s.proxy.GetImportState(ctx, req)
|
||||
}
|
||||
|
||||
// Check is required by gRPC healthy checking
|
||||
func (s *Server) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
|
||||
ret := &grpc_health_v1.HealthCheckResponse{
|
||||
Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
|
||||
}
|
||||
state, err := s.proxy.GetComponentStates(ctx)
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
if state.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return ret, nil
|
||||
}
|
||||
if state.State.StateCode != internalpb.StateCode_Healthy {
|
||||
return ret, nil
|
||||
}
|
||||
ret.Status = grpc_health_v1.HealthCheckResponse_SERVING
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// Watch is required by gRPC healthy checking
|
||||
func (s *Server) Watch(req *grpc_health_v1.HealthCheckRequest, server grpc_health_v1.Health_WatchServer) error {
|
||||
ret := &grpc_health_v1.HealthCheckResponse{
|
||||
Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
|
||||
}
|
||||
state, err := s.proxy.GetComponentStates(s.ctx)
|
||||
if err != nil {
|
||||
return server.Send(ret)
|
||||
}
|
||||
if state.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return server.Send(ret)
|
||||
}
|
||||
if state.State.StateCode != internalpb.StateCode_Healthy {
|
||||
return server.Send(ret)
|
||||
}
|
||||
ret.Status = grpc_health_v1.HealthCheckResponse_SERVING
|
||||
return server.Send(ret)
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package grpcproxy
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
|
@ -30,16 +31,42 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||
"github.com/milvus-io/milvus/internal/proxy"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
milvusmock "github.com/milvus-io/milvus/internal/util/mock"
|
||||
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
)
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
type MockBase struct {
|
||||
mock.Mock
|
||||
isMockGetComponentStatesOn bool
|
||||
}
|
||||
|
||||
func (m *MockBase) On(methodName string, arguments ...interface{}) *mock.Call {
|
||||
if methodName == "GetComponentStates" {
|
||||
m.isMockGetComponentStatesOn = true
|
||||
}
|
||||
return m.Mock.On(methodName, arguments...)
|
||||
}
|
||||
|
||||
func (m *MockBase) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
|
||||
if m.isMockGetComponentStatesOn {
|
||||
ret1 := &internalpb.ComponentStates{}
|
||||
var ret2 error
|
||||
args := m.Called(ctx)
|
||||
arg1 := args.Get(0)
|
||||
arg2 := args.Get(1)
|
||||
if arg1 != nil {
|
||||
ret1 = arg1.(*internalpb.ComponentStates)
|
||||
}
|
||||
if arg2 != nil {
|
||||
ret2 = arg2.(error)
|
||||
}
|
||||
return ret1, ret2
|
||||
}
|
||||
return &internalpb.ComponentStates{
|
||||
State: &internalpb.ComponentInfo{StateCode: internalpb.StateCode_Healthy},
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
|
@ -431,6 +458,7 @@ type MockProxy struct {
|
|||
startErr error
|
||||
stopErr error
|
||||
regErr error
|
||||
isMockOn bool
|
||||
}
|
||||
|
||||
func (m *MockProxy) Init() error {
|
||||
|
@ -901,6 +929,128 @@ func Test_NewServer(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestServer_Check(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
server, err := NewServer(ctx, nil)
|
||||
assert.NotNil(t, server)
|
||||
assert.Nil(t, err)
|
||||
|
||||
mockProxy := &MockProxy{}
|
||||
server.proxy = mockProxy
|
||||
server.rootCoordClient = &MockRootCoord{}
|
||||
server.indexCoordClient = &MockIndexCoord{}
|
||||
server.queryCoordClient = &MockQueryCoord{}
|
||||
server.dataCoordClient = &MockDataCoord{}
|
||||
|
||||
req := &grpc_health_v1.HealthCheckRequest{Service: ""}
|
||||
ret, err := server.Check(ctx, req)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, ret.Status)
|
||||
|
||||
mockProxy.On("GetComponentStates", ctx).Return(nil, fmt.Errorf("mock grpc unexpected error")).Once()
|
||||
|
||||
ret, err = server.Check(ctx, req)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, ret.Status)
|
||||
|
||||
componentInfo := &internalpb.ComponentInfo{
|
||||
NodeID: 0,
|
||||
Role: "proxy",
|
||||
StateCode: internalpb.StateCode_Abnormal,
|
||||
}
|
||||
status := &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}
|
||||
componentState := &internalpb.ComponentStates{
|
||||
State: componentInfo,
|
||||
Status: status,
|
||||
}
|
||||
mockProxy.On("GetComponentStates", ctx).Return(componentState, nil)
|
||||
|
||||
ret, err = server.Check(ctx, req)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, ret.Status)
|
||||
|
||||
status.ErrorCode = commonpb.ErrorCode_Success
|
||||
ret, err = server.Check(ctx, req)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, ret.Status)
|
||||
|
||||
componentInfo.StateCode = internalpb.StateCode_Initializing
|
||||
ret, err = server.Check(ctx, req)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, ret.Status)
|
||||
|
||||
componentInfo.StateCode = internalpb.StateCode_Healthy
|
||||
ret, err = server.Check(ctx, req)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, ret.Status)
|
||||
}
|
||||
|
||||
func TestServer_Watch(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
server, err := NewServer(ctx, nil)
|
||||
assert.NotNil(t, server)
|
||||
assert.Nil(t, err)
|
||||
|
||||
mockProxy := &MockProxy{}
|
||||
server.proxy = mockProxy
|
||||
server.rootCoordClient = &MockRootCoord{}
|
||||
server.indexCoordClient = &MockIndexCoord{}
|
||||
server.queryCoordClient = &MockQueryCoord{}
|
||||
server.dataCoordClient = &MockDataCoord{}
|
||||
|
||||
watchServer := milvusmock.NewGrpcHealthWatchServer()
|
||||
resultChan := watchServer.Chan()
|
||||
req := &grpc_health_v1.HealthCheckRequest{Service: ""}
|
||||
//var ret *grpc_health_v1.HealthCheckResponse
|
||||
err = server.Watch(req, watchServer)
|
||||
ret := <-resultChan
|
||||
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, ret.Status)
|
||||
|
||||
mockProxy.On("GetComponentStates", ctx).Return(nil, fmt.Errorf("mock grpc unexpected error")).Once()
|
||||
|
||||
err = server.Watch(req, watchServer)
|
||||
ret = <-resultChan
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, ret.Status)
|
||||
|
||||
componentInfo := &internalpb.ComponentInfo{
|
||||
NodeID: 0,
|
||||
Role: "proxy",
|
||||
StateCode: internalpb.StateCode_Abnormal,
|
||||
}
|
||||
status := &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}
|
||||
componentState := &internalpb.ComponentStates{
|
||||
State: componentInfo,
|
||||
Status: status,
|
||||
}
|
||||
mockProxy.On("GetComponentStates", ctx).Return(componentState, nil)
|
||||
|
||||
err = server.Watch(req, watchServer)
|
||||
ret = <-resultChan
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, ret.Status)
|
||||
|
||||
status.ErrorCode = commonpb.ErrorCode_Success
|
||||
err = server.Watch(req, watchServer)
|
||||
ret = <-resultChan
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, ret.Status)
|
||||
|
||||
componentInfo.StateCode = internalpb.StateCode_Initializing
|
||||
err = server.Watch(req, watchServer)
|
||||
ret = <-resultChan
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING, ret.Status)
|
||||
|
||||
componentInfo.StateCode = internalpb.StateCode_Healthy
|
||||
err = server.Watch(req, watchServer)
|
||||
ret = <-resultChan
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, ret.Status)
|
||||
}
|
||||
|
||||
func Test_NewServer_HTTPServerDisabled(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
server, err := NewServer(ctx, nil)
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
type GrpcHealthWatchServer struct {
|
||||
chanResult chan *grpc_health_v1.HealthCheckResponse
|
||||
}
|
||||
|
||||
func NewGrpcHealthWatchServer() *GrpcHealthWatchServer {
|
||||
return &GrpcHealthWatchServer{
|
||||
chanResult: make(chan *grpc_health_v1.HealthCheckResponse, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func (m GrpcHealthWatchServer) Send(response *grpc_health_v1.HealthCheckResponse) error {
|
||||
m.chanResult <- response
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m GrpcHealthWatchServer) Chan() <-chan *grpc_health_v1.HealthCheckResponse {
|
||||
return m.chanResult
|
||||
}
|
||||
|
||||
func (m GrpcHealthWatchServer) SetHeader(md metadata.MD) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m GrpcHealthWatchServer) SendHeader(md metadata.MD) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m GrpcHealthWatchServer) SetTrailer(md metadata.MD) {
|
||||
}
|
||||
|
||||
func (m GrpcHealthWatchServer) Context() context.Context {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m GrpcHealthWatchServer) SendMsg(msg interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m GrpcHealthWatchServer) RecvMsg(msg interface{}) error {
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue