mirror of https://github.com/milvus-io/milvus.git
parent
4f2a1e7912
commit
5658f779fe
|
@ -1,50 +1,14 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/cmd/distributed/components"
|
||||
"github.com/zilliztech/milvus-distributed/cmd/distributed/roles"
|
||||
)
|
||||
|
||||
type MilvusRoles struct {
|
||||
EnableMaster bool `env:"ENABLE_MASTER"`
|
||||
EnableProxyService bool `env:"ENABLE_PROXY_SERVICE"`
|
||||
EnableProxyNode bool `env:"ENABLE_PROXY_NODE"`
|
||||
EnableQueryService bool `env:"ENABLE_QUERY_SERVICE"`
|
||||
EnableQueryNode bool `env:"ENABLE_QUERY_NODE"`
|
||||
EnableDataService bool `env:"ENABLE_DATA_SERVICE"`
|
||||
EnableDataNode bool `env:"ENABLE_DATA_NODE"`
|
||||
EnableIndexService bool `env:"ENABLE_INDEX_SERVICE"`
|
||||
EnableIndexNode bool `env:"ENABLE_INDEX_NODE"`
|
||||
EnableMsgStreamService bool `env:"ENABLE_MSGSTREAM_SERVICE"`
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) hasAnyRole() bool {
|
||||
return mr.EnableMaster || mr.EnableMsgStreamService ||
|
||||
mr.EnableProxyService || mr.EnableProxyNode ||
|
||||
mr.EnableQueryService || mr.EnableQueryNode ||
|
||||
mr.EnableDataService || mr.EnableDataNode ||
|
||||
mr.EnableIndexService || mr.EnableIndexNode
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) envValue(env string) bool {
|
||||
env = strings.ToLower(env)
|
||||
env = strings.Trim(env, " ")
|
||||
if env == "1" || env == "true" {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func main() {
|
||||
var roles MilvusRoles
|
||||
|
||||
func initRoles(roles *roles.MilvusRoles) {
|
||||
flag.BoolVar(&roles.EnableMaster, "master-service", false, "start as master service")
|
||||
flag.BoolVar(&roles.EnableProxyService, "proxy-service", false, "start as proxy service")
|
||||
flag.BoolVar(&roles.EnableProxyNode, "proxy-node", false, "start as proxy node")
|
||||
|
@ -57,251 +21,39 @@ func main() {
|
|||
flag.BoolVar(&roles.EnableMsgStreamService, "msg-stream", false, "start as msg stream service")
|
||||
flag.Parse()
|
||||
|
||||
if !roles.hasAnyRole() {
|
||||
if !roles.HasAnyRole() {
|
||||
for _, e := range os.Environ() {
|
||||
pairs := strings.SplitN(e, "=", 2)
|
||||
if len(pairs) == 2 {
|
||||
switch pairs[0] {
|
||||
case "ENABLE_MASTER":
|
||||
roles.EnableMaster = roles.envValue(pairs[1])
|
||||
roles.EnableMaster = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_PROXY_SERVICE":
|
||||
roles.EnableProxyService = roles.envValue(pairs[1])
|
||||
roles.EnableProxyService = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_PROXY_NODE":
|
||||
roles.EnableProxyNode = roles.envValue(pairs[1])
|
||||
roles.EnableProxyNode = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_QUERY_SERVICE":
|
||||
roles.EnableQueryService = roles.envValue(pairs[1])
|
||||
roles.EnableQueryService = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_QUERY_NODE":
|
||||
roles.EnableQueryNode = roles.envValue(pairs[1])
|
||||
roles.EnableQueryNode = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_DATA_SERVICE":
|
||||
roles.EnableDataService = roles.envValue(pairs[1])
|
||||
roles.EnableDataService = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_DATA_NODE":
|
||||
roles.EnableDataNode = roles.envValue(pairs[1])
|
||||
roles.EnableDataNode = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_INDEX_SERVICE":
|
||||
roles.EnableIndexService = roles.envValue(pairs[1])
|
||||
roles.EnableIndexService = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_INDEX_NODE":
|
||||
roles.EnableIndexNode = roles.envValue(pairs[1])
|
||||
roles.EnableIndexNode = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_MSGSTREAM_SERVICE":
|
||||
roles.EnableMsgStreamService = roles.envValue(pairs[1])
|
||||
roles.EnableMsgStreamService = roles.EnvValue(pairs[1])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !roles.hasAnyRole() {
|
||||
log.Printf("set the roles please ...")
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
var masterService *components.MasterService
|
||||
if roles.EnableMaster {
|
||||
log.Print("start as master service")
|
||||
go func() {
|
||||
var err error
|
||||
masterService, err = components.NewMasterService(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = masterService.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var proxyService *components.ProxyService
|
||||
if roles.EnableProxyService {
|
||||
log.Print("start as proxy service")
|
||||
go func() {
|
||||
var err error
|
||||
proxyService, err = components.NewProxyService(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = proxyService.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var proxyNode *components.ProxyNode
|
||||
if roles.EnableProxyNode {
|
||||
log.Print("start as proxy node")
|
||||
go func() {
|
||||
var err error
|
||||
proxyNode, err = components.NewProxyNode(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = proxyNode.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var queryService *components.QueryService
|
||||
if roles.EnableQueryService {
|
||||
log.Print("start as query service")
|
||||
go func() {
|
||||
var err error
|
||||
queryService, err = components.NewQueryService(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = queryService.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var queryNode *components.QueryNode
|
||||
if roles.EnableQueryNode {
|
||||
log.Print("start as query node")
|
||||
go func() {
|
||||
var err error
|
||||
queryNode, err = components.NewQueryNode(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = queryNode.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var dataService *components.DataService
|
||||
if roles.EnableDataService {
|
||||
log.Print("start as data service")
|
||||
go func() {
|
||||
var err error
|
||||
dataService, err = components.NewDataService(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = dataService.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var dataNode *components.DataNode
|
||||
if roles.EnableDataNode {
|
||||
log.Print("start as data node")
|
||||
go func() {
|
||||
var err error
|
||||
dataNode, err = components.NewDataNode(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = dataNode.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var indexService *components.IndexService
|
||||
if roles.EnableIndexService {
|
||||
log.Print("start as index service")
|
||||
go func() {
|
||||
var err error
|
||||
indexService, err = components.NewIndexService(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = indexService.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var indexNode *components.IndexNode
|
||||
if roles.EnableIndexNode {
|
||||
log.Print("start as index node")
|
||||
go func() {
|
||||
var err error
|
||||
indexNode, err = components.NewIndexNode(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = indexNode.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var msgStream *components.MsgStream
|
||||
if roles.EnableMsgStreamService {
|
||||
log.Print("start as msg stream service")
|
||||
go func() {
|
||||
var err error
|
||||
msgStream, err = components.NewMsgStreamService(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = msgStream.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
sc := make(chan os.Signal, 1)
|
||||
signal.Notify(sc,
|
||||
syscall.SIGHUP,
|
||||
syscall.SIGINT,
|
||||
syscall.SIGTERM,
|
||||
syscall.SIGQUIT)
|
||||
sig := <-sc
|
||||
log.Printf("Get %s signal to exit", sig.String())
|
||||
|
||||
if roles.EnableMaster {
|
||||
if masterService != nil {
|
||||
_ = masterService.Stop()
|
||||
}
|
||||
log.Printf("exit master service")
|
||||
}
|
||||
|
||||
if roles.EnableProxyService {
|
||||
if proxyService != nil {
|
||||
_ = proxyService.Stop()
|
||||
}
|
||||
log.Printf("exit proxy service")
|
||||
}
|
||||
|
||||
if roles.EnableProxyNode {
|
||||
if proxyNode != nil {
|
||||
_ = proxyNode.Stop()
|
||||
}
|
||||
log.Printf("exit proxy node")
|
||||
}
|
||||
|
||||
if roles.EnableQueryService {
|
||||
if queryService != nil {
|
||||
_ = queryService.Stop()
|
||||
}
|
||||
log.Printf("exit query service")
|
||||
}
|
||||
|
||||
if roles.EnableQueryNode {
|
||||
if queryNode != nil {
|
||||
_ = queryNode.Stop()
|
||||
}
|
||||
log.Printf("exit query node")
|
||||
}
|
||||
|
||||
if roles.EnableDataService {
|
||||
if dataService != nil {
|
||||
_ = dataService.Stop()
|
||||
}
|
||||
log.Printf("exit data service")
|
||||
}
|
||||
|
||||
if roles.EnableDataNode {
|
||||
if dataNode != nil {
|
||||
_ = dataNode.Stop()
|
||||
}
|
||||
log.Printf("exit data node")
|
||||
}
|
||||
|
||||
if roles.EnableIndexService {
|
||||
if indexService != nil {
|
||||
_ = indexService.Stop()
|
||||
}
|
||||
log.Printf("exit index service")
|
||||
}
|
||||
|
||||
if roles.EnableIndexNode {
|
||||
if indexNode != nil {
|
||||
_ = indexNode.Stop()
|
||||
}
|
||||
log.Printf("exit index node")
|
||||
}
|
||||
|
||||
if roles.EnableMsgStreamService {
|
||||
if msgStream != nil {
|
||||
_ = msgStream.Stop()
|
||||
}
|
||||
log.Printf("exit msg stream service")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func main() {
|
||||
var roles roles.MilvusRoles
|
||||
initRoles(&roles)
|
||||
roles.Run()
|
||||
}
|
||||
|
|
|
@ -1,28 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestRoles(t *testing.T) {
|
||||
r := MilvusRoles{}
|
||||
|
||||
assert.True(t, r.envValue("1"))
|
||||
assert.True(t, r.envValue(" 1 "))
|
||||
assert.True(t, r.envValue("True"))
|
||||
assert.True(t, r.envValue(" True "))
|
||||
assert.True(t, r.envValue(" TRue "))
|
||||
assert.False(t, r.envValue("0"))
|
||||
assert.False(t, r.envValue(" 0 "))
|
||||
assert.False(t, r.envValue(" false "))
|
||||
assert.False(t, r.envValue(" False "))
|
||||
assert.False(t, r.envValue(" abc "))
|
||||
|
||||
ss := strings.SplitN("abcdef", "=", 2)
|
||||
assert.Equal(t, len(ss), 1)
|
||||
ss = strings.SplitN("adb=def", "=", 2)
|
||||
assert.Equal(t, len(ss), 2)
|
||||
}
|
|
@ -0,0 +1,261 @@
|
|||
package roles
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/cmd/distributed/components"
|
||||
)
|
||||
|
||||
type MilvusRoles struct {
|
||||
EnableMaster bool `env:"ENABLE_MASTER"`
|
||||
EnableProxyService bool `env:"ENABLE_PROXY_SERVICE"`
|
||||
EnableProxyNode bool `env:"ENABLE_PROXY_NODE"`
|
||||
EnableQueryService bool `env:"ENABLE_QUERY_SERVICE"`
|
||||
EnableQueryNode bool `env:"ENABLE_QUERY_NODE"`
|
||||
EnableDataService bool `env:"ENABLE_DATA_SERVICE"`
|
||||
EnableDataNode bool `env:"ENABLE_DATA_NODE"`
|
||||
EnableIndexService bool `env:"ENABLE_INDEX_SERVICE"`
|
||||
EnableIndexNode bool `env:"ENABLE_INDEX_NODE"`
|
||||
EnableMsgStreamService bool `env:"ENABLE_MSGSTREAM_SERVICE"`
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) HasAnyRole() bool {
|
||||
return mr.EnableMaster || mr.EnableMsgStreamService ||
|
||||
mr.EnableProxyService || mr.EnableProxyNode ||
|
||||
mr.EnableQueryService || mr.EnableQueryNode ||
|
||||
mr.EnableDataService || mr.EnableDataNode ||
|
||||
mr.EnableIndexService || mr.EnableIndexNode
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) EnvValue(env string) bool {
|
||||
env = strings.ToLower(env)
|
||||
env = strings.Trim(env, " ")
|
||||
if env == "1" || env == "true" {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) Run() {
|
||||
if !mr.HasAnyRole() {
|
||||
log.Printf("set the roles please ...")
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
var masterService *components.MasterService
|
||||
if mr.EnableMaster {
|
||||
log.Print("start as master service")
|
||||
go func() {
|
||||
var err error
|
||||
masterService, err = components.NewMasterService(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = masterService.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var proxyService *components.ProxyService
|
||||
if mr.EnableProxyService {
|
||||
log.Print("start as proxy service")
|
||||
go func() {
|
||||
var err error
|
||||
proxyService, err = components.NewProxyService(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = proxyService.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var proxyNode *components.ProxyNode
|
||||
if mr.EnableProxyNode {
|
||||
log.Print("start as proxy node")
|
||||
go func() {
|
||||
var err error
|
||||
proxyNode, err = components.NewProxyNode(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = proxyNode.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var queryService *components.QueryService
|
||||
if mr.EnableQueryService {
|
||||
log.Print("start as query service")
|
||||
go func() {
|
||||
var err error
|
||||
queryService, err = components.NewQueryService(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = queryService.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var queryNode *components.QueryNode
|
||||
if mr.EnableQueryNode {
|
||||
log.Print("start as query node")
|
||||
go func() {
|
||||
var err error
|
||||
queryNode, err = components.NewQueryNode(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = queryNode.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var dataService *components.DataService
|
||||
if mr.EnableDataService {
|
||||
log.Print("start as data service")
|
||||
go func() {
|
||||
var err error
|
||||
dataService, err = components.NewDataService(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = dataService.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var dataNode *components.DataNode
|
||||
if mr.EnableDataNode {
|
||||
log.Print("start as data node")
|
||||
go func() {
|
||||
var err error
|
||||
dataNode, err = components.NewDataNode(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = dataNode.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var indexService *components.IndexService
|
||||
if mr.EnableIndexService {
|
||||
log.Print("start as index service")
|
||||
go func() {
|
||||
var err error
|
||||
indexService, err = components.NewIndexService(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = indexService.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var indexNode *components.IndexNode
|
||||
if mr.EnableIndexNode {
|
||||
log.Print("start as index node")
|
||||
go func() {
|
||||
var err error
|
||||
indexNode, err = components.NewIndexNode(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = indexNode.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
var msgStream *components.MsgStream
|
||||
if mr.EnableMsgStreamService {
|
||||
log.Print("start as msg stream service")
|
||||
go func() {
|
||||
var err error
|
||||
msgStream, err = components.NewMsgStreamService(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = msgStream.Run()
|
||||
}()
|
||||
}
|
||||
|
||||
sc := make(chan os.Signal, 1)
|
||||
signal.Notify(sc,
|
||||
syscall.SIGHUP,
|
||||
syscall.SIGINT,
|
||||
syscall.SIGTERM,
|
||||
syscall.SIGQUIT)
|
||||
sig := <-sc
|
||||
log.Printf("Get %s signal to exit", sig.String())
|
||||
|
||||
if mr.EnableMaster {
|
||||
if masterService != nil {
|
||||
_ = masterService.Stop()
|
||||
}
|
||||
log.Printf("exit master service")
|
||||
}
|
||||
|
||||
if mr.EnableProxyService {
|
||||
if proxyService != nil {
|
||||
_ = proxyService.Stop()
|
||||
}
|
||||
log.Printf("exit proxy service")
|
||||
}
|
||||
|
||||
if mr.EnableProxyNode {
|
||||
if proxyNode != nil {
|
||||
_ = proxyNode.Stop()
|
||||
}
|
||||
log.Printf("exit proxy node")
|
||||
}
|
||||
|
||||
if mr.EnableQueryService {
|
||||
if queryService != nil {
|
||||
_ = queryService.Stop()
|
||||
}
|
||||
log.Printf("exit query service")
|
||||
}
|
||||
|
||||
if mr.EnableQueryNode {
|
||||
if queryNode != nil {
|
||||
_ = queryNode.Stop()
|
||||
}
|
||||
log.Printf("exit query node")
|
||||
}
|
||||
|
||||
if mr.EnableDataService {
|
||||
if dataService != nil {
|
||||
_ = dataService.Stop()
|
||||
}
|
||||
log.Printf("exit data service")
|
||||
}
|
||||
|
||||
if mr.EnableDataNode {
|
||||
if dataNode != nil {
|
||||
_ = dataNode.Stop()
|
||||
}
|
||||
log.Printf("exit data node")
|
||||
}
|
||||
|
||||
if mr.EnableIndexService {
|
||||
if indexService != nil {
|
||||
_ = indexService.Stop()
|
||||
}
|
||||
log.Printf("exit index service")
|
||||
}
|
||||
|
||||
if mr.EnableIndexNode {
|
||||
if indexNode != nil {
|
||||
_ = indexNode.Stop()
|
||||
}
|
||||
log.Printf("exit index node")
|
||||
}
|
||||
|
||||
if mr.EnableMsgStreamService {
|
||||
if msgStream != nil {
|
||||
_ = msgStream.Stop()
|
||||
}
|
||||
log.Printf("exit msg stream service")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,89 @@
|
|||
package roles
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestRoles(t *testing.T) {
|
||||
r := MilvusRoles{}
|
||||
assert.False(t, r.HasAnyRole())
|
||||
|
||||
assert.True(t, r.EnvValue("1"))
|
||||
assert.True(t, r.EnvValue(" 1 "))
|
||||
assert.True(t, r.EnvValue("True"))
|
||||
assert.True(t, r.EnvValue(" True "))
|
||||
assert.True(t, r.EnvValue(" TRue "))
|
||||
assert.False(t, r.EnvValue("0"))
|
||||
assert.False(t, r.EnvValue(" 0 "))
|
||||
assert.False(t, r.EnvValue(" false "))
|
||||
assert.False(t, r.EnvValue(" False "))
|
||||
assert.False(t, r.EnvValue(" abc "))
|
||||
|
||||
ss := strings.SplitN("abcdef", "=", 2)
|
||||
assert.Equal(t, len(ss), 1)
|
||||
ss = strings.SplitN("adb=def", "=", 2)
|
||||
assert.Equal(t, len(ss), 2)
|
||||
|
||||
{
|
||||
var roles MilvusRoles
|
||||
roles.EnableMaster = true
|
||||
assert.True(t, roles.HasAnyRole())
|
||||
}
|
||||
|
||||
{
|
||||
var roles MilvusRoles
|
||||
roles.EnableProxyService = true
|
||||
assert.True(t, roles.HasAnyRole())
|
||||
}
|
||||
|
||||
{
|
||||
var roles MilvusRoles
|
||||
roles.EnableProxyNode = true
|
||||
assert.True(t, roles.HasAnyRole())
|
||||
}
|
||||
|
||||
{
|
||||
var roles MilvusRoles
|
||||
roles.EnableQueryService = true
|
||||
assert.True(t, roles.HasAnyRole())
|
||||
}
|
||||
|
||||
{
|
||||
var roles MilvusRoles
|
||||
roles.EnableQueryNode = true
|
||||
assert.True(t, roles.HasAnyRole())
|
||||
}
|
||||
|
||||
{
|
||||
var roles MilvusRoles
|
||||
roles.EnableDataService = true
|
||||
assert.True(t, roles.HasAnyRole())
|
||||
}
|
||||
|
||||
{
|
||||
var roles MilvusRoles
|
||||
roles.EnableDataNode = true
|
||||
assert.True(t, roles.HasAnyRole())
|
||||
}
|
||||
|
||||
{
|
||||
var roles MilvusRoles
|
||||
roles.EnableIndexService = true
|
||||
assert.True(t, roles.HasAnyRole())
|
||||
}
|
||||
|
||||
{
|
||||
var roles MilvusRoles
|
||||
roles.EnableIndexNode = true
|
||||
assert.True(t, roles.HasAnyRole())
|
||||
}
|
||||
|
||||
{
|
||||
var roles MilvusRoles
|
||||
roles.EnableMsgStreamService = true
|
||||
assert.True(t, roles.HasAnyRole())
|
||||
}
|
||||
}
|
|
@ -1,155 +1,24 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/indexnode"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proxynode"
|
||||
"github.com/zilliztech/milvus-distributed/internal/querynode"
|
||||
"github.com/zilliztech/milvus-distributed/cmd/distributed/roles"
|
||||
)
|
||||
|
||||
func InitProxy(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
//proxynode.Init()
|
||||
//fmt.Println("ProxyID is", proxynode.Params.ProxyID())
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
svr, err := proxynode.NewProxyNodeImpl(ctx)
|
||||
if err != nil {
|
||||
log.Print("create server failed", zap.Error(err))
|
||||
}
|
||||
|
||||
sc := make(chan os.Signal, 1)
|
||||
signal.Notify(sc,
|
||||
syscall.SIGHUP,
|
||||
syscall.SIGINT,
|
||||
syscall.SIGTERM,
|
||||
syscall.SIGQUIT)
|
||||
|
||||
var sig os.Signal
|
||||
go func() {
|
||||
sig = <-sc
|
||||
cancel()
|
||||
}()
|
||||
|
||||
if err := svr.Init(); err != nil {
|
||||
log.Fatal("init server failed", zap.Error(err))
|
||||
}
|
||||
|
||||
if err := svr.Start(); err != nil {
|
||||
log.Fatal("run server failed", zap.Error(err))
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
log.Print("Got signal to exit", zap.String("signal", sig.String()))
|
||||
|
||||
svr.Stop()
|
||||
switch sig {
|
||||
case syscall.SIGTERM:
|
||||
exit(0)
|
||||
default:
|
||||
exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func InitQueryNode(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
querynode.Init()
|
||||
fmt.Println("QueryNodeID is", querynode.Params.QueryNodeID)
|
||||
// Creates server.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
svr := querynode.NewQueryNode(ctx, 0)
|
||||
|
||||
sc := make(chan os.Signal, 1)
|
||||
signal.Notify(sc,
|
||||
syscall.SIGHUP,
|
||||
syscall.SIGINT,
|
||||
syscall.SIGTERM,
|
||||
syscall.SIGQUIT)
|
||||
|
||||
var sig os.Signal
|
||||
go func() {
|
||||
sig = <-sc
|
||||
cancel()
|
||||
}()
|
||||
|
||||
svr.Start()
|
||||
|
||||
<-ctx.Done()
|
||||
log.Print("Got signal to exit", zap.String("signal", sig.String()))
|
||||
|
||||
svr.Stop()
|
||||
switch sig {
|
||||
case syscall.SIGTERM:
|
||||
exit(0)
|
||||
default:
|
||||
exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func InitIndexBuilder(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
svr, err := indexnode.NewNodeImpl(ctx)
|
||||
if err != nil {
|
||||
log.Print("create server failed", zap.Error(err))
|
||||
}
|
||||
|
||||
sc := make(chan os.Signal, 1)
|
||||
signal.Notify(sc,
|
||||
syscall.SIGHUP,
|
||||
syscall.SIGINT,
|
||||
syscall.SIGTERM,
|
||||
syscall.SIGQUIT)
|
||||
|
||||
var sig os.Signal
|
||||
go func() {
|
||||
sig = <-sc
|
||||
cancel()
|
||||
}()
|
||||
|
||||
if err := svr.Init(); err != nil {
|
||||
log.Fatal("init builder server failed", zap.Error(err))
|
||||
}
|
||||
|
||||
if err := svr.Start(); err != nil {
|
||||
log.Fatal("run builder server failed", zap.Error(err))
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
log.Print("Got signal to exit", zap.String("signal", sig.String()))
|
||||
|
||||
svr.Stop()
|
||||
switch sig {
|
||||
case syscall.SIGTERM:
|
||||
exit(0)
|
||||
default:
|
||||
exit(1)
|
||||
}
|
||||
func initRoles(roles *roles.MilvusRoles) {
|
||||
roles.EnableMaster = true
|
||||
roles.EnableProxyService = true
|
||||
roles.EnableProxyNode = true
|
||||
roles.EnableQueryService = true
|
||||
roles.EnableQueryNode = true
|
||||
roles.EnableDataService = true
|
||||
roles.EnableDataNode = true
|
||||
roles.EnableIndexService = true
|
||||
roles.EnableIndexNode = true
|
||||
roles.EnableMsgStreamService = true
|
||||
}
|
||||
|
||||
func main() {
|
||||
var wg sync.WaitGroup
|
||||
flag.Parse()
|
||||
time.Sleep(time.Second * 1)
|
||||
wg.Add(1)
|
||||
go InitProxy(&wg)
|
||||
wg.Add(1)
|
||||
go InitQueryNode(&wg)
|
||||
wg.Add(1)
|
||||
go InitIndexBuilder(&wg)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func exit(code int) {
|
||||
os.Exit(code)
|
||||
var roles roles.MilvusRoles
|
||||
initRoles(&roles)
|
||||
roles.Run()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue