Refactor masterservice main

Signed-off-by: neza2017 <yefu.chen@zilliz.com>
pull/4973/head^2
neza2017 2021-01-29 18:34:12 +08:00 committed by yefu.chen
parent bfb6766f7c
commit ee378f3a93
2 changed files with 139 additions and 93 deletions

View File

@ -0,0 +1,132 @@
package components
import (
"context"
"fmt"
"log"
ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
ps "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice"
psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client"
is "github.com/zilliztech/milvus-distributed/internal/indexservice"
ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
type MasterService struct {
ctx context.Context
svr *msc.GrpcServer
proxyService *psc.Client
dataService *dsc.Client
indexService *isc.Client
}
func NewMasterService(ctx context.Context) (*MasterService, error) {
const reTryCnt = 3
svr, err := msc.NewGrpcServer(ctx)
if err != nil {
return nil, err
}
log.Printf("master service address : %s:%d", ms.Params.Address, ms.Params.Port)
cnt := 0
ps.Params.Init()
log.Printf("proxy service address : %s", ps.Params.ServiceAddress)
proxyService := psc.NewClient(ps.Params.ServiceAddress)
if err = proxyService.Init(); err != nil {
panic(err)
}
for cnt = 0; cnt < reTryCnt; cnt++ {
pxStates, err := proxyService.GetComponentStates()
if err != nil {
log.Printf("get state from proxy service, retry count = %d, error = %s", cnt, err.Error())
continue
}
if pxStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Printf("get state from proxy service, retry count = %d, error = %s", cnt, pxStates.Status.Reason)
continue
}
if pxStates.State.StateCode != internalpb2.StateCode_INITIALIZING && pxStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if err = svr.SetProxyService(proxyService); err != nil {
panic(err)
}
ds.Params.Init()
log.Printf("data service address : %s:%d", ds.Params.Address, ds.Params.Port)
dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port))
if err = dataService.Init(); err != nil {
panic(err)
}
if err = dataService.Start(); err != nil {
panic(err)
}
for cnt = 0; cnt < reTryCnt; cnt++ {
dsStates, err := dataService.GetComponentStates()
if err != nil {
log.Printf("retry cout = %d, error = %s", cnt, err.Error())
continue
}
if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Printf("retry cout = %d, error = %s", cnt, dsStates.Status.Reason)
continue
}
if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if cnt >= reTryCnt {
panic("connect to data service failed")
}
if err = svr.SetDataService(dataService); err != nil {
panic(err)
}
is.Params.Init()
log.Printf("index service address : %s", is.Params.Address)
indexService := isc.NewClient(is.Params.Address)
if err = svr.SetIndexService(indexService); err != nil {
return nil, err
}
return &MasterService{
ctx: ctx,
svr: svr,
proxyService: proxyService,
dataService: dataService,
indexService: indexService,
}, nil
}
func (m *MasterService) Run() error {
if err := m.svr.Init(); err != nil {
return err
}
if err := m.svr.Start(); err != nil {
return err
}
return nil
}
func (m *MasterService) Stop() error {
_ = m.proxyService.Stop()
_ = m.indexService.Stop()
_ = m.dataService.Stop()
return m.svr.Stop()
}

View File

@ -2,110 +2,23 @@ package main
import ( import (
"context" "context"
"fmt"
"log" "log"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
ds "github.com/zilliztech/milvus-distributed/internal/dataservice" distributed "github.com/zilliztech/milvus-distributed/cmd/distributed/components"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
ps "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice"
psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client"
is "github.com/zilliztech/milvus-distributed/internal/indexservice"
ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
) )
const reTryCnt = 3
func main() { func main() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
svr, err := msc.NewGrpcServer(ctx) ms, err := distributed.NewMasterService(ctx)
if err != nil { if err != nil {
panic(err) panic(err)
} }
log.Printf("master service address : %s:%d", ms.Params.Address, ms.Params.Port) if err = ms.Run(); err != nil {
cnt := 0
ps.Params.Init()
log.Printf("proxy service address : %s", ps.Params.ServiceAddress)
proxyService := psc.NewClient(ps.Params.ServiceAddress)
if err = proxyService.Init(); err != nil {
panic(err)
}
for cnt = 0; cnt < reTryCnt; cnt++ {
pxStates, err := proxyService.GetComponentStates()
if err != nil {
log.Printf("get state from proxy service, retry count = %d, error = %s", cnt, err.Error())
continue
}
if pxStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Printf("get state from proxy service, retry count = %d, error = %s", cnt, pxStates.Status.Reason)
continue
}
if pxStates.State.StateCode != internalpb2.StateCode_INITIALIZING && pxStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if err = svr.SetProxyService(proxyService); err != nil {
panic(err)
}
ds.Params.Init()
log.Printf("data service address : %s:%d", ds.Params.Address, ds.Params.Port)
dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port))
if err = dataService.Init(); err != nil {
panic(err)
}
if err = dataService.Start(); err != nil {
panic(err)
}
for cnt = 0; cnt < reTryCnt; cnt++ {
dsStates, err := dataService.GetComponentStates()
if err != nil {
log.Printf("retry cout = %d, error = %s", cnt, err.Error())
continue
}
if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Printf("retry cout = %d, error = %s", cnt, dsStates.Status.Reason)
continue
}
if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if cnt >= reTryCnt {
panic("connect to data service failed")
}
if err = svr.SetDataService(dataService); err != nil {
panic(err)
}
is.Params.Init()
log.Printf("index service address : %s", is.Params.Address)
indexService := isc.NewClient(is.Params.Address)
if err = svr.SetIndexService(indexService); err != nil {
panic(err)
}
if err = svr.Init(); err != nil {
panic(err)
}
if err = svr.Start(); err != nil {
panic(err) panic(err)
} }
@ -117,7 +30,8 @@ func main() {
syscall.SIGQUIT) syscall.SIGQUIT)
sig := <-sc sig := <-sc
log.Printf("Got %s signal to exit", sig.String()) log.Printf("Got %s signal to exit", sig.String())
_ = indexService.Stop() err = ms.Stop()
_ = dataService.Stop() if err != nil {
_ = svr.Stop() panic(err)
}
} }