Refactor the client interface of proxy service

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
pull/4973/head^2
dragondriver 2021-01-29 17:29:31 +08:00 committed by yefu.chen
parent 99cef4b5c8
commit f660caca0c
14 changed files with 208 additions and 96 deletions

View File

@ -124,8 +124,6 @@ indexservice: build-cpp
# Builds various components locally.
build-go: build-cpp
@echo "Building each component's binary to './bin'"
@echo "Building master ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="0" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/master $(PWD)/cmd/master/main.go 1>/dev/null
@echo "Building masterservice ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/masterservice $(PWD)/cmd/masterservice/main.go 1>/dev/null
@echo "Building proxy service ..."
@ -137,8 +135,6 @@ build-go: build-cpp
@echo "Building query service ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/queryservice $(PWD)/cmd/queryservice/queryservice.go 1>/dev/null
@echo "Building query node ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/querynode.go 1>/dev/null
@echo "Building write node ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/writenode $(PWD)/cmd/writenode/writenode.go 1>/dev/null
@echo "Building binlog ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/binlog $(PWD)/cmd/binlog/main.go 1>/dev/null

View File

@ -12,7 +12,8 @@ import (
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice"
ps "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice"
psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client"
is "github.com/zilliztech/milvus-distributed/internal/indexservice"
ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -33,9 +34,9 @@ func main() {
cnt := 0
psc.Params.Init()
log.Printf("proxy service address : %s", psc.Params.ServiceAddress)
proxyService := psc.NewClient(psc.Params.ServiceAddress)
ps.Params.Init()
log.Printf("proxy service address : %s", ps.Params.ServiceAddress)
proxyService := psc.NewClient(ps.Params.ServiceAddress)
if err = proxyService.Init(); err != nil {
panic(err)
}

View File

@ -29,6 +29,7 @@ func main() {
var sig os.Signal
go func() {
sig = <-sc
log.Println("receive stop signal ...")
cancel()
}()

View File

@ -29,6 +29,7 @@ func main() {
var sig os.Signal
go func() {
sig = <-sc
log.Println("receive stop signal")
cancel()
}()

View File

@ -1,4 +1,4 @@
package grpcproxynode
package grpcproxynodeclient
import (
"context"

View File

@ -9,6 +9,8 @@ import (
"sync"
"time"
grpcproxyserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@ -18,7 +20,6 @@ import (
grpcdataservice "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
grpcindexserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
grcpmasterservice "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
grpcproxyservice "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice"
grpcqueryserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
@ -38,7 +39,7 @@ type Server struct {
port int
//todo
proxyServiceClient *grpcproxyservice.Client
proxyServiceClient *grpcproxyserviceclient.Client
// todo InitParams Service addrs
masterServiceClient *grcpmasterservice.GrpcClient
@ -91,12 +92,14 @@ func (s *Server) startGrpcLoop(grpcPort int) {
func (s *Server) Run() error {
if err := s.init(); err != nil {
return nil
return err
}
log.Println("proxy node init done ...")
if err := s.start(); err != nil {
return err
}
log.Println("proxy node start done ...")
return nil
}
@ -113,6 +116,10 @@ func (s *Server) init() error {
Params.Port = funcutil.GetAvailablePort()
Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10)
log.Println("proxy host: ", Params.IP)
log.Println("proxy port: ", Params.Port)
log.Println("proxy address: ", Params.Address)
defer func() {
if err != nil {
err2 := s.Stop()
@ -126,18 +133,21 @@ func (s *Server) init() error {
go s.startGrpcLoop(Params.Port)
// wait for grpc server loop start
err = <-s.grpcErrChan
log.Println("create grpc server ...")
if err != nil {
return err
}
s.proxyServiceClient = grpcproxyservice.NewClient(Params.ProxyServiceAddress)
s.proxyServiceClient = grpcproxyserviceclient.NewClient(Params.ProxyServiceAddress)
err = s.proxyServiceClient.Init()
if err != nil {
return err
}
s.impl.SetProxyServiceClient(s.proxyServiceClient)
log.Println("set proxy service client ...")
masterServiceAddr := Params.MasterAddress
log.Println("master address: ", masterServiceAddr)
timeout := 3 * time.Second
s.masterServiceClient, err = grcpmasterservice.NewGrpcClient(masterServiceAddr, timeout)
if err != nil {
@ -148,33 +158,40 @@ func (s *Server) init() error {
return err
}
s.impl.SetMasterClient(s.masterServiceClient)
log.Println("set master client ...")
dataServiceAddr := Params.DataServiceAddress
log.Println("data service address ...")
s.dataServiceClient = grpcdataservice.NewClient(dataServiceAddr)
err = s.dataServiceClient.Init()
if err != nil {
return err
}
s.impl.SetDataServiceClient(s.dataServiceClient)
log.Println("set data service address ...")
indexServiceAddr := Params.IndexServerAddress
log.Println("index server address: ", indexServiceAddr)
s.indexServiceClient = grpcindexserviceclient.NewClient(indexServiceAddr)
err = s.indexServiceClient.Init()
if err != nil {
return err
}
s.impl.SetIndexServiceClient(s.indexServiceClient)
log.Println("set index service client ...")
queryServiceAddr := Params.QueryServiceAddress
s.queryServiceClient = grpcqueryserviceclient.NewClient(queryServiceAddr)
err = s.queryServiceClient.Init()
if err != nil {
return err
}
s.impl.SetQueryServiceClient(s.queryServiceClient)
// queryServiceAddr := Params.QueryServiceAddress
// log.Println("query service address: ", queryServiceAddr)
// s.queryServiceClient = grpcqueryserviceclient.NewClient(queryServiceAddr)
// err = s.queryServiceClient.Init()
// if err != nil {
// return err
// }
// s.impl.SetQueryServiceClient(s.queryServiceClient)
// log.Println("set query service client ...")
proxynode.Params.Init()
log.Println("init params done ...")
proxynode.Params.NetworkPort = Params.Port
proxynode.Params.IP = Params.IP
proxynode.Params.NetworkAddress = Params.Address
@ -184,6 +201,7 @@ func (s *Server) init() error {
s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
if err := s.impl.Init(); err != nil {
log.Println("impl init error: ", err)
return err
}

View File

@ -1,4 +1,4 @@
package grpcproxyservice
package grpcproxyserviceclient
import (
"context"

View File

@ -2,7 +2,6 @@ package grpcproxyservice
import (
"context"
"fmt"
"log"
"net"
"strconv"
@ -18,8 +17,9 @@ import (
)
type Server struct {
ctx context.Context
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
grpcServer *grpc.Server
grpcErrChan chan error
@ -27,10 +27,12 @@ type Server struct {
impl *proxyservice.ServiceImpl
}
func NewServer(ctx context.Context) (*Server, error) {
func NewServer(ctx1 context.Context) (*Server, error) {
ctx, cancel := context.WithCancel(ctx1)
server := &Server{
ctx: ctx,
cancel: cancel,
grpcErrChan: make(chan error),
}
@ -47,6 +49,7 @@ func (s *Server) Run() error {
if err := s.init(); err != nil {
return err
}
log.Println("proxy service init done ...")
if err := s.start(); err != nil {
return err
@ -57,6 +60,7 @@ func (s *Server) Run() error {
func (s *Server) init() error {
Params.Init()
proxyservice.Params.Init()
log.Println("init params done")
s.wg.Add(1)
go s.startGrpcLoop(Params.ServicePort)
@ -65,6 +69,7 @@ func (s *Server) init() error {
return err
}
s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
log.Println("grpc init done ...")
if err := s.impl.Init(); err != nil {
return err
@ -76,7 +81,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
defer s.wg.Done()
fmt.Println("network port: ", grpcPort)
log.Println("network port: ", grpcPort)
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
if err != nil {
log.Printf("GrpcServer:failed to listen: %v", err)
@ -99,7 +104,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
}
func (s *Server) start() error {
fmt.Println("proxy ServiceImpl start ...")
log.Println("proxy ServiceImpl start ...")
if err := s.impl.Start(); err != nil {
return err
}
@ -107,10 +112,14 @@ func (s *Server) start() error {
}
func (s *Server) Stop() error {
s.impl.Stop()
err := s.impl.Stop()
if err != nil {
return err
}
if s.grpcServer != nil {
s.grpcServer.GracefulStop()
}
s.cancel()
s.wg.Wait()
return nil
}

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/spf13/cast"
"github.com/spf13/viper"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@ -58,44 +59,50 @@ func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb2.InitParam
config := viper.New()
config.SetConfigType("yaml")
save := func() error {
for _, key := range config.AllKeys() {
val := config.Get(key)
str, err := cast.ToStringE(val)
if err != nil {
switch val := val.(type) {
case []interface{}:
str = str[:0]
for _, v := range val {
ss, err := cast.ToStringE(v)
if err != nil {
log.Panic(err)
}
if len(str) == 0 {
str = ss
} else {
str = str + "," + ss
}
}
default:
log.Panicf("undefine config type, key=%s", key)
}
}
log.Println("key: ", key, ", value: ", str)
err = pt.Save(key, str)
if err != nil {
panic(err)
}
}
return nil
}
for _, pair := range initParams.StartParams {
if pair.Key == StartParamsKey {
if strings.HasPrefix(pair.Key, StartParamsKey) {
err := config.ReadConfig(bytes.NewBuffer([]byte(pair.Value)))
if err != nil {
return err
}
break
}
}
for _, key := range config.AllKeys() {
val := config.Get(key)
str, err := cast.ToStringE(val)
if err != nil {
switch val := val.(type) {
case []interface{}:
str = str[:0]
for _, v := range val {
ss, err := cast.ToStringE(v)
if err != nil {
log.Panic(err)
}
if len(str) == 0 {
str = ss
} else {
str = str + "," + ss
}
}
default:
log.Panicf("undefine config type, key=%s", key)
err = save()
if err != nil {
return err
}
}
err = pt.Save(key, str)
if err != nil {
panic(err)
}
}
pt.initParams()
@ -194,14 +201,18 @@ func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb2.InitParam
func (pt *ParamTable) Init() {
pt.BaseTable.Init()
pt.initParams()
// err := pt.LoadYaml("advanced/proxy_node.yaml")
// if err != nil {
// panic(err)
// }
// pt.initParams()
}
func (pt *ParamTable) initParams() {
pt.initPulsarAddress()
pt.initQueryNodeIDList()
pt.initQueryNodeNum()
pt.initProxyID()
// pt.initProxyID()
pt.initTimeTickInterval()
pt.initInsertChannelNames()
pt.initDeleteChannelNames()
@ -264,11 +275,11 @@ func (pt *ParamTable) initProxyID() {
}
func (pt *ParamTable) initTimeTickInterval() {
internalStr, err := pt.Load("proxyNode.timeTickInterval")
intervalStr, err := pt.Load("proxyNode.timeTickInterval")
if err != nil {
panic(err)
}
interval, err := strconv.Atoi(internalStr)
interval, err := strconv.Atoi(intervalStr)
if err != nil {
panic(err)
}
@ -371,11 +382,12 @@ func (pt *ParamTable) initProxySubName() {
if err != nil {
panic(err)
}
proxyIDStr, err := pt.Load("_proxyID")
if err != nil {
panic(err)
}
pt.ProxySubName = prefix + "-" + proxyIDStr
pt.ProxySubName = prefix
// proxyIDStr, err := pt.Load("_proxyID")
// if err != nil {
// panic(err)
// }
pt.ProxySubName = prefix + "-" + strconv.Itoa(int(pt.ProxyID))
}
func (pt *ParamTable) initProxyTimeTickChannelNames() {

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"log"
"math/rand"
"sync"
"time"
@ -112,6 +113,7 @@ func (node *NodeImpl) Init() error {
if err != nil {
return err
}
log.Println("service was ready ...")
request := &proxypb.RegisterNodeRequest{
Address: &commonpb.Address{
@ -171,6 +173,7 @@ func (node *NodeImpl) Init() error {
}
node.UpdateStateCode(internalpb2.StateCode_HEALTHY)
log.Println("proxy node is healthy ...")
// todo
//Params.InsertChannelNames, err = node.dataServiceClient.GetInsertChannels()
@ -196,6 +199,7 @@ func (node *NodeImpl) Init() error {
node.queryMsgStream = pulsarms.NewPulsarMsgStream(node.ctx, Params.MsgStreamSearchBufSize)
node.queryMsgStream.SetPulsarClient(pulsarAddress)
node.queryMsgStream.CreatePulsarProducers(Params.SearchChannelNames)
log.Println("create query message stream ...")
masterAddr := Params.MasterAddress
idAllocator, err := allocator.NewIDAllocator(node.ctx, masterAddr)
@ -227,6 +231,7 @@ func (node *NodeImpl) Init() error {
return insertRepackFunc(tsMsgs, hashKeys, node.segAssigner, true)
}
node.manipulationMsgStream.SetRepackFunc(repackFuncImpl)
log.Println("create manipulation message stream ...")
node.sched, err = NewTaskScheduler(node.ctx, node.idAllocator, node.tsoAllocator)
if err != nil {
@ -240,13 +245,28 @@ func (node *NodeImpl) Init() error {
func (node *NodeImpl) Start() error {
initGlobalMetaCache(node.ctx, node)
log.Println("init global meta cache ...")
node.manipulationMsgStream.Start()
log.Println("start manipulation message stream ...")
node.queryMsgStream.Start()
log.Println("start query message stream ...")
node.sched.Start()
log.Println("start scheduler ...")
node.idAllocator.Start()
log.Println("start id allocator ...")
node.tsoAllocator.Start()
log.Println("start tso allocator ...")
node.segAssigner.Start()
log.Println("start seg assigner ...")
node.tick.Start()
log.Println("start time tick ...")
// Start callbacks
for _, cb := range node.startCallbacks {

View File

@ -53,7 +53,7 @@ func (s *ServiceImpl) fillNodeInitParams() error {
if err != nil {
panic(err)
}
return data
return append(data, []byte("\n")...)
}
channelYamlContent := getConfigContentByName(ChannelYamlContent)
@ -65,20 +65,35 @@ func (s *ServiceImpl) fillNodeInitParams() error {
writeNodeYamlContent := getConfigContentByName(WriteNodeYamlContent)
milvusYamlContent := getConfigContentByName(MilvusYamlContent)
var allContent []byte
allContent = append(allContent, channelYamlContent...)
allContent = append(allContent, commonYamlContent...)
allContent = append(allContent, dataNodeYamlContent...)
allContent = append(allContent, masterYamlContent...)
allContent = append(allContent, proxyNodeYamlContent...)
allContent = append(allContent, queryNodeYamlContent...)
allContent = append(allContent, writeNodeYamlContent...)
allContent = append(allContent, milvusYamlContent...)
appendContent := func(key string, content []byte) {
s.nodeStartParams = append(s.nodeStartParams, &commonpb.KeyValuePair{
Key: StartParamsKey + "_" + key,
Value: string(content),
})
}
appendContent(ChannelYamlContent, channelYamlContent)
appendContent(CommonYamlContent, commonYamlContent)
appendContent(DataNodeYamlContent, dataNodeYamlContent)
appendContent(MasterYamlContent, masterYamlContent)
appendContent(ProxyNodeYamlContent, proxyNodeYamlContent)
appendContent(QueryNodeYamlContent, queryNodeYamlContent)
appendContent(WriteNodeYamlContent, writeNodeYamlContent)
appendContent(MilvusYamlContent, milvusYamlContent)
s.nodeStartParams = append(s.nodeStartParams, &commonpb.KeyValuePair{
Key: StartParamsKey,
Value: string(allContent),
})
// var allContent []byte
// allContent = append(allContent, channelYamlContent...)
// allContent = append(allContent, commonYamlContent...)
// allContent = append(allContent, dataNodeYamlContent...)
// allContent = append(allContent, masterYamlContent...)
// allContent = append(allContent, proxyNodeYamlContent...)
// allContent = append(allContent, queryNodeYamlContent...)
// allContent = append(allContent, writeNodeYamlContent...)
// allContent = append(allContent, milvusYamlContent...)
// s.nodeStartParams = append(s.nodeStartParams, &commonpb.KeyValuePair{
// Key: StartParamsKey,
// Value: string(allContent),
// })
return nil
}
@ -89,10 +104,12 @@ func (s *ServiceImpl) Init() error {
if err != nil {
return err
}
log.Println("fill node init params ...")
serviceTimeTickMsgStream := pulsarms.NewPulsarTtMsgStream(s.ctx, 1024)
serviceTimeTickMsgStream.SetPulsarClient(Params.PulsarAddress)
serviceTimeTickMsgStream.CreatePulsarProducers([]string{Params.ServiceTimeTickChannel})
log.Println("create service time tick producer channel: ", []string{Params.ServiceTimeTickChannel})
nodeTimeTickMsgStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024)
nodeTimeTickMsgStream.SetPulsarClient(Params.PulsarAddress)
@ -100,9 +117,12 @@ func (s *ServiceImpl) Init() error {
"proxyservicesub", // TODO: add config
util.NewUnmarshalDispatcher(),
1024)
log.Println("create node time tick consumer channel: ", Params.NodeTimeTickChannel)
ttBarrier := newSoftTimeTickBarrier(s.ctx, nodeTimeTickMsgStream, []UniqueID{0}, 10)
log.Println("create soft time tick barrier ...")
s.tick = newTimeTick(s.ctx, ttBarrier, serviceTimeTickMsgStream)
log.Println("create time tick ...")
s.stateCode = internalpb2.StateCode_HEALTHY
@ -111,12 +131,33 @@ func (s *ServiceImpl) Init() error {
func (s *ServiceImpl) Start() error {
s.sched.Start()
log.Println("start scheduler ...")
return s.tick.Start()
}
func (s *ServiceImpl) Stop() error {
s.sched.Close()
log.Println("close scheduler ...")
s.tick.Close()
log.Println("close time tick")
var err error
nodeClients, err := s.nodeInfos.ObtainAllClients()
if err != nil {
panic(err)
return err
}
for _, nodeClient := range nodeClients {
err = nodeClient.Stop()
if err != nil {
panic(err)
return err
}
}
log.Println("stop all node clients ...")
s.cancel()
return nil
}
@ -238,7 +279,7 @@ func (s *ServiceImpl) InvalidateCollectionMetaCache(request *proxypb.InvalidateC
var err error
err = s.sched.RegisterNodeTaskQueue.Enqueue(t)
err = s.sched.InvalidateCollectionMetaCacheTaskQueue.Enqueue(t)
if err != nil {
return err
}

View File

@ -1,11 +1,15 @@
package proxyservice
import (
"fmt"
"context"
"log"
"math/rand"
"strconv"
"sync"
"time"
grpcproxynodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxynode/client"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
@ -16,18 +20,14 @@ type NodeInfo struct {
port int64
}
// TODO: replace as real node client impl
type NodeClient interface {
Init() error
Start() error
Stop() error
InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error
}
type FakeNodeClient struct {
}
func (c *FakeNodeClient) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error {
panic("implement me")
}
type GlobalNodeInfoTable struct {
mtx sync.RWMutex
nodeIDs []UniqueID
@ -86,9 +86,17 @@ func (table *GlobalNodeInfoTable) createClients() error {
for nodeID, info := range table.infos {
_, ok := table.clients[nodeID]
if !ok {
// TODO: use info to create client
fmt.Println(info)
table.clients[nodeID] = &FakeNodeClient{}
log.Println(info)
table.clients[nodeID] = grpcproxynodeclient.NewClient(context.Background(), info.ip+":"+strconv.Itoa(int(info.port)))
var err error
err = table.clients[nodeID].Init()
if err != nil {
return err
}
err = table.clients[nodeID].Start()
if err != nil {
return err
}
}
}

View File

@ -116,6 +116,10 @@ func (t *RegisterNodeTask) Execute() error {
err := t.nodeInfos.Register(nodeID, &info)
// TODO: fill init params
t.response = &proxypb.RegisterNodeResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
Reason: "",
},
InitParams: &internalpb2.InitParams{
NodeID: nodeID,
StartParams: t.startParams,

View File

@ -28,6 +28,7 @@ type (
)
func (tt *TimeTickImpl) Start() error {
log.Println("start time tick ...")
tt.wg.Add(1)
go func() {
defer tt.wg.Done()