Add GetTimeTickChannel, GetComponentStates to proxy service

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
pull/4973/head^2
dragondriver 2021-01-26 14:55:57 +08:00 committed by yefu.chen
parent b445587e11
commit d972b75aaa
7 changed files with 154 additions and 40 deletions

View File

@ -40,7 +40,7 @@ func (s *Server) connectProxyService() error {
proxynode.Params.Init()
s.proxyServiceAddress = proxynode.Params.ProxyServiceAddress()
s.proxyServiceClient = grpcproxyservice.NewClient(s.ctx, s.proxyServiceAddress)
s.proxyServiceClient = grpcproxyservice.NewClient(s.proxyServiceAddress)
getAvailablePort := func() int {
listener, err := net.Listen("tcp", ":0")

View File

@ -44,9 +44,9 @@ func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMe
return err
}
func NewClient(ctx context.Context, address string) *Client {
func NewClient(address string) *Client {
return &Client{
address: address,
ctx: ctx,
ctx: context.Background(),
}
}

View File

@ -7,6 +7,8 @@ import (
"strconv"
"sync"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -24,6 +26,26 @@ type Server struct {
grpcServer *grpc.Server
}
func (s *Server) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
channel, err := s.impl.GetTimeTickChannel()
if err != nil {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
},
Value: "",
}, nil
}
return &milvuspb.StringResponse{
Value: channel,
}, nil
}
func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
return s.impl.GetComponentStates()
}
func CreateProxyServiceServer() (*Server, error) {
return &Server{}, nil
}

View File

@ -5,6 +5,7 @@ option go_package = "github.com/zilliztech/milvus-distributed/internal/proto/pro
import "common.proto";
import "internal.proto";
import "milvus.proto";
message RegisterNodeRequest {
@ -24,6 +25,8 @@ message InvalidateCollMetaCacheRequest {
}
service ProxyService {
rpc GetTimeTickChannel(common.Empty) returns (milvus.StringResponse) {}
rpc GetComponentStates(common.Empty) returns (internal.ComponentStates) {}
rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeResponse) {}
rpc InvalidateCollectionMetaCache(InvalidateCollMetaCacheRequest) returns (common.Status) {}
}

View File

@ -9,6 +9,7 @@ import (
proto "github.com/golang/protobuf/proto"
commonpb "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
internalpb2 "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
milvuspb "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
@ -184,33 +185,37 @@ func init() {
func init() { proto.RegisterFile("proxy_service.proto", fileDescriptor_34ca2fbc94d169de) }
var fileDescriptor_34ca2fbc94d169de = []byte{
// 403 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x92, 0x3f, 0xaf, 0xda, 0x30,
0x14, 0xc5, 0x49, 0x5b, 0x81, 0x6a, 0x10, 0xad, 0x4c, 0xa5, 0xa2, 0xf4, 0x8f, 0xda, 0x2c, 0xb0,
0x34, 0xa9, 0x82, 0xd4, 0xb5, 0x02, 0x26, 0x06, 0x10, 0x0a, 0x5b, 0x17, 0xe4, 0xc4, 0x57, 0x60,
0xc9, 0xb1, 0x53, 0xdb, 0x41, 0x2d, 0x4b, 0xb7, 0xae, 0x5d, 0xfa, 0x1d, 0xfb, 0x35, 0xaa, 0xd8,
0x84, 0xf7, 0xa2, 0x87, 0x78, 0xc3, 0x1b, 0xde, 0x96, 0xeb, 0xfc, 0x7c, 0x72, 0xee, 0x39, 0x41,
0x83, 0x42, 0xc9, 0x1f, 0x3f, 0xb7, 0x1a, 0xd4, 0x81, 0x65, 0x10, 0x16, 0x4a, 0x1a, 0x89, 0x71,
0xce, 0xf8, 0xa1, 0xd4, 0x6e, 0x0a, 0x2d, 0xe1, 0xf7, 0x32, 0x99, 0xe7, 0x52, 0xb8, 0x33, 0xbf,
0xcf, 0x84, 0x01, 0x25, 0x08, 0x77, 0x73, 0xf0, 0x0b, 0x0d, 0x12, 0xd8, 0x31, 0x6d, 0x40, 0xad,
0x24, 0x85, 0x04, 0xbe, 0x97, 0xa0, 0x0d, 0xfe, 0x8c, 0x9e, 0xa5, 0x44, 0xc3, 0xd0, 0xfb, 0xe0,
0x8d, 0xbb, 0xf1, 0xdb, 0xb0, 0xa1, 0x7b, 0x12, 0x5c, 0xea, 0xdd, 0x8c, 0x68, 0x48, 0x2c, 0x89,
0xbf, 0xa0, 0x0e, 0xa1, 0x54, 0x81, 0xd6, 0xc3, 0x27, 0x57, 0x2e, 0x4d, 0x1d, 0x93, 0xd4, 0x70,
0xf0, 0xc7, 0x43, 0xaf, 0x9a, 0x0e, 0x74, 0x21, 0x85, 0x06, 0x3c, 0x43, 0x5d, 0x26, 0x98, 0xd9,
0x16, 0x44, 0x91, 0x5c, 0x9f, 0x9c, 0x7c, 0x6c, 0x8a, 0x9e, 0x97, 0x59, 0x08, 0x66, 0xd6, 0x16,
0x4c, 0x10, 0x3b, 0x3f, 0xe3, 0x09, 0x6a, 0x6b, 0x43, 0x4c, 0x59, 0x7b, 0x7a, 0x73, 0xd1, 0xd3,
0xc6, 0x22, 0xc9, 0x09, 0x0d, 0xfe, 0x7a, 0xe8, 0xfd, 0x42, 0x1c, 0x08, 0x67, 0x94, 0x18, 0x98,
0x4b, 0xce, 0x97, 0x60, 0xc8, 0x9c, 0x64, 0xfb, 0x07, 0xc4, 0xf3, 0x1a, 0x75, 0x68, 0xba, 0x15,
0x24, 0x07, 0x6b, 0xe5, 0x79, 0xd2, 0xa6, 0xe9, 0x8a, 0xe4, 0x80, 0x47, 0xe8, 0x45, 0x26, 0x39,
0x87, 0xcc, 0x30, 0x29, 0x1c, 0xf0, 0xd4, 0x02, 0xfd, 0x9b, 0xe3, 0x0a, 0x8c, 0xff, 0x79, 0xa8,
0xb7, 0xae, 0x1a, 0xdd, 0xb8, 0xca, 0x71, 0x86, 0x7a, 0xb7, 0x83, 0xc3, 0xa3, 0xf0, 0x6e, 0xfb,
0xe1, 0x85, 0x72, 0xfd, 0xf1, 0xfd, 0xa0, 0xeb, 0x20, 0x68, 0x61, 0x85, 0xde, 0x35, 0xb3, 0x70,
0x8e, 0xce, 0x89, 0xe0, 0xf8, 0x92, 0xd8, 0xf5, 0xf8, 0xfc, 0x6b, 0x35, 0x04, 0xad, 0xf8, 0xb7,
0x87, 0x5e, 0xda, 0x4d, 0x2b, 0x2f, 0xf5, 0xb6, 0x8f, 0x60, 0x64, 0x36, 0xfd, 0xf6, 0x75, 0xc7,
0xcc, 0xbe, 0x4c, 0xab, 0x37, 0xd1, 0x91, 0x71, 0xce, 0x8e, 0x06, 0xb2, 0x7d, 0xe4, 0x6e, 0x7d,
0xa2, 0x4c, 0x1b, 0xc5, 0xd2, 0xd2, 0x00, 0x8d, 0xea, 0x5f, 0x31, 0xb2, 0x52, 0x91, 0xfd, 0x7c,
0x91, 0xa6, 0x6d, 0x3b, 0x4e, 0xfe, 0x07, 0x00, 0x00, 0xff, 0xff, 0xa8, 0x2b, 0x26, 0x14, 0xaf,
0x03, 0x00, 0x00,
// 469 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x92, 0xcd, 0x6f, 0xd3, 0x40,
0x10, 0xc5, 0x13, 0x8a, 0x52, 0xb1, 0x8d, 0x0a, 0xda, 0x22, 0x51, 0x99, 0x0f, 0x81, 0x91, 0x68,
0x2f, 0xd8, 0x28, 0x95, 0xb8, 0xa2, 0x26, 0x42, 0xa8, 0x87, 0x56, 0x95, 0x53, 0x09, 0xa9, 0x97,
0x68, 0x6d, 0x8f, 0x92, 0x11, 0xfb, 0x61, 0x76, 0xc7, 0x11, 0xed, 0x85, 0x1b, 0x57, 0x2e, 0x1c,
0xf9, 0x63, 0x91, 0xd7, 0x76, 0xa8, 0x21, 0x84, 0x03, 0x07, 0x6e, 0x1e, 0xef, 0x6f, 0xde, 0xbe,
0x9d, 0x37, 0x6c, 0xaf, 0xb0, 0xe6, 0xd3, 0xd5, 0xcc, 0x81, 0x5d, 0x62, 0x06, 0x51, 0x61, 0x0d,
0x19, 0xce, 0x15, 0xca, 0x65, 0xe9, 0xea, 0x2a, 0xf2, 0x44, 0x30, 0xcc, 0x8c, 0x52, 0x46, 0xd7,
0xff, 0x82, 0x5d, 0xd4, 0x04, 0x56, 0x0b, 0xd9, 0xd4, 0xc3, 0x9b, 0x1d, 0xe1, 0x67, 0xb6, 0x97,
0xc0, 0x1c, 0x1d, 0x81, 0x3d, 0x33, 0x39, 0x24, 0xf0, 0xb1, 0x04, 0x47, 0xfc, 0x15, 0xbb, 0x9d,
0x0a, 0x07, 0xfb, 0xfd, 0xa7, 0xfd, 0xc3, 0x9d, 0xd1, 0xa3, 0xa8, 0x73, 0x4b, 0x23, 0x7f, 0xea,
0xe6, 0x63, 0xe1, 0x20, 0xf1, 0x24, 0x7f, 0xcd, 0xb6, 0x45, 0x9e, 0x5b, 0x70, 0x6e, 0xff, 0xd6,
0x86, 0xa6, 0xe3, 0x9a, 0x49, 0x5a, 0x38, 0xfc, 0xda, 0x67, 0xf7, 0xbb, 0x0e, 0x5c, 0x61, 0xb4,
0x03, 0x3e, 0x66, 0x3b, 0xa8, 0x91, 0x66, 0x85, 0xb0, 0x42, 0xb9, 0xc6, 0xc9, 0xb3, 0xae, 0xe8,
0xea, 0x69, 0x27, 0x1a, 0xe9, 0xdc, 0x83, 0x09, 0xc3, 0xd5, 0x37, 0x3f, 0x62, 0x03, 0x47, 0x82,
0xca, 0xd6, 0xd3, 0xc3, 0xb5, 0x9e, 0xa6, 0x1e, 0x49, 0x1a, 0x34, 0xfc, 0xd6, 0x67, 0x4f, 0x4e,
0xf4, 0x52, 0x48, 0xcc, 0x05, 0xc1, 0xc4, 0x48, 0x79, 0x0a, 0x24, 0x26, 0x22, 0x5b, 0xfc, 0xc3,
0x78, 0x1e, 0xb0, 0xed, 0x3c, 0x9d, 0x69, 0xa1, 0xc0, 0x5b, 0xb9, 0x93, 0x0c, 0xf2, 0xf4, 0x4c,
0x28, 0xe0, 0x07, 0xec, 0x6e, 0x66, 0xa4, 0x84, 0x8c, 0xd0, 0xe8, 0x1a, 0xd8, 0xf2, 0xc0, 0xee,
0xcf, 0xdf, 0x15, 0x38, 0xfa, 0xbe, 0xc5, 0x86, 0xe7, 0x55, 0xbe, 0xd3, 0x7a, 0x01, 0xf8, 0x7b,
0xc6, 0xdf, 0x01, 0x5d, 0xa0, 0x82, 0x0b, 0xcc, 0x3e, 0x4c, 0x16, 0x42, 0x6b, 0x90, 0x3c, 0x58,
0x6b, 0xe6, 0xad, 0x2a, 0xe8, 0x2a, 0x78, 0xde, 0x3d, 0x6b, 0x8a, 0x29, 0x59, 0xd4, 0xf3, 0x76,
0xee, 0x61, 0x8f, 0x5f, 0x7a, 0xe1, 0x89, 0x51, 0x85, 0xd1, 0xa0, 0xa9, 0x1a, 0x0f, 0xb8, 0x8d,
0xc2, 0x2f, 0xfe, 0x10, 0xcb, 0x2f, 0x1a, 0x61, 0x8f, 0x67, 0x6c, 0x78, 0x33, 0x6d, 0x7e, 0x10,
0xfd, 0xbe, 0xc0, 0xd1, 0x9a, 0x8d, 0x0c, 0x0e, 0xff, 0x0e, 0xae, 0x1e, 0x60, 0xd9, 0xe3, 0x6e,
0x80, 0xf5, 0x18, 0x57, 0x31, 0xf2, 0xd1, 0x3a, 0xb1, 0xcd, 0x99, 0x07, 0x9b, 0x76, 0x27, 0xec,
0x8d, 0xbe, 0xf4, 0xd9, 0x3d, 0x1f, 0x4f, 0xe5, 0xa5, 0x8d, 0xe8, 0x3f, 0x18, 0x19, 0x1f, 0x5f,
0xbe, 0x99, 0x23, 0x2d, 0xca, 0xb4, 0x3a, 0x89, 0xaf, 0x51, 0x4a, 0xbc, 0x26, 0xc8, 0x16, 0x71,
0xdd, 0xf5, 0x32, 0x47, 0x47, 0x16, 0xd3, 0x92, 0x20, 0x8f, 0xdb, 0xa0, 0x62, 0x2f, 0x15, 0xfb,
0xeb, 0x8b, 0x34, 0x1d, 0xf8, 0xf2, 0xe8, 0x47, 0x00, 0x00, 0x00, 0xff, 0xff, 0xd8, 0xd8, 0x3f,
0xb5, 0x72, 0x04, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -225,6 +230,8 @@ const _ = grpc.SupportPackageIsVersion4
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type ProxyServiceClient interface {
GetTimeTickChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*milvuspb.StringResponse, error)
GetComponentStates(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*internalpb2.ComponentStates, error)
RegisterNode(ctx context.Context, in *RegisterNodeRequest, opts ...grpc.CallOption) (*RegisterNodeResponse, error)
InvalidateCollectionMetaCache(ctx context.Context, in *InvalidateCollMetaCacheRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
}
@ -237,6 +244,24 @@ func NewProxyServiceClient(cc *grpc.ClientConn) ProxyServiceClient {
return &proxyServiceClient{cc}
}
func (c *proxyServiceClient) GetTimeTickChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) {
out := new(milvuspb.StringResponse)
err := c.cc.Invoke(ctx, "/milvus.proto.proxy.ProxyService/GetTimeTickChannel", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *proxyServiceClient) GetComponentStates(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*internalpb2.ComponentStates, error) {
out := new(internalpb2.ComponentStates)
err := c.cc.Invoke(ctx, "/milvus.proto.proxy.ProxyService/GetComponentStates", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *proxyServiceClient) RegisterNode(ctx context.Context, in *RegisterNodeRequest, opts ...grpc.CallOption) (*RegisterNodeResponse, error) {
out := new(RegisterNodeResponse)
err := c.cc.Invoke(ctx, "/milvus.proto.proxy.ProxyService/RegisterNode", in, out, opts...)
@ -257,6 +282,8 @@ func (c *proxyServiceClient) InvalidateCollectionMetaCache(ctx context.Context,
// ProxyServiceServer is the server API for ProxyService service.
type ProxyServiceServer interface {
GetTimeTickChannel(context.Context, *commonpb.Empty) (*milvuspb.StringResponse, error)
GetComponentStates(context.Context, *commonpb.Empty) (*internalpb2.ComponentStates, error)
RegisterNode(context.Context, *RegisterNodeRequest) (*RegisterNodeResponse, error)
InvalidateCollectionMetaCache(context.Context, *InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
}
@ -265,6 +292,12 @@ type ProxyServiceServer interface {
type UnimplementedProxyServiceServer struct {
}
func (*UnimplementedProxyServiceServer) GetTimeTickChannel(ctx context.Context, req *commonpb.Empty) (*milvuspb.StringResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetTimeTickChannel not implemented")
}
func (*UnimplementedProxyServiceServer) GetComponentStates(ctx context.Context, req *commonpb.Empty) (*internalpb2.ComponentStates, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetComponentStates not implemented")
}
func (*UnimplementedProxyServiceServer) RegisterNode(ctx context.Context, req *RegisterNodeRequest) (*RegisterNodeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RegisterNode not implemented")
}
@ -276,6 +309,42 @@ func RegisterProxyServiceServer(s *grpc.Server, srv ProxyServiceServer) {
s.RegisterService(&_ProxyService_serviceDesc, srv)
}
func _ProxyService_GetTimeTickChannel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(commonpb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ProxyServiceServer).GetTimeTickChannel(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/milvus.proto.proxy.ProxyService/GetTimeTickChannel",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ProxyServiceServer).GetTimeTickChannel(ctx, req.(*commonpb.Empty))
}
return interceptor(ctx, in, info, handler)
}
func _ProxyService_GetComponentStates_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(commonpb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ProxyServiceServer).GetComponentStates(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/milvus.proto.proxy.ProxyService/GetComponentStates",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ProxyServiceServer).GetComponentStates(ctx, req.(*commonpb.Empty))
}
return interceptor(ctx, in, info, handler)
}
func _ProxyService_RegisterNode_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RegisterNodeRequest)
if err := dec(in); err != nil {
@ -316,6 +385,14 @@ var _ProxyService_serviceDesc = grpc.ServiceDesc{
ServiceName: "milvus.proto.proxy.ProxyService",
HandlerType: (*ProxyServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "GetTimeTickChannel",
Handler: _ProxyService_GetTimeTickChannel_Handler,
},
{
MethodName: "GetComponentStates",
Handler: _ProxyService_GetComponentStates_Handler,
},
{
MethodName: "RegisterNode",
Handler: _ProxyService_RegisterNode_Handler,

View File

@ -1,7 +1,9 @@
package proxyservice
import (
"sync"
"context"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
@ -14,21 +16,23 @@ type NodeIDAllocator interface {
}
type NaiveNodeIDAllocatorImpl struct {
mtx sync.Mutex
now UniqueID
impl *allocator.IDAllocator
}
func (allocator *NaiveNodeIDAllocatorImpl) AllocOne() UniqueID {
allocator.mtx.Lock()
defer func() {
allocator.now++
allocator.mtx.Unlock()
}()
return allocator.now
id, err := allocator.impl.AllocOne()
if err != nil {
panic(err)
}
return id
}
func NewNodeIDAllocator() NodeIDAllocator {
impl, err := allocator.NewIDAllocator(context.Background(), Params.MasterAddress())
if err != nil {
panic(err)
}
return &NaiveNodeIDAllocatorImpl{
now: 0,
impl: impl,
}
}

View File

@ -24,6 +24,14 @@ func (pt *ParamTable) PulsarAddress() string {
return ret
}
func (pt *ParamTable) MasterAddress() string {
ret, err := pt.Load("_MasterAddress")
if err != nil {
panic(err)
}
return ret
}
func (pt *ParamTable) NodeTimeTickChannel() []string {
prefix, err := pt.Load("msgChannel.chanNamePrefix.proxyTimeTick")
if err != nil {