mirror of https://github.com/milvus-io/milvus.git
parent
099220394e
commit
1cc994dea8
|
@ -2,58 +2,126 @@ package main
|
|||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"path"
|
||||
"syscall"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/cmd/distributed/roles"
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
)
|
||||
|
||||
func initRoles(roles *roles.MilvusRoles) {
|
||||
flag.BoolVar(&roles.EnableMaster, "master-service", false, "start as master service")
|
||||
flag.BoolVar(&roles.EnableProxyService, "proxy-service", false, "start as proxy service")
|
||||
flag.BoolVar(&roles.EnableProxyNode, "proxy-node", false, "start as proxy node")
|
||||
flag.BoolVar(&roles.EnableQueryService, "query-service", false, "start as query service")
|
||||
flag.BoolVar(&roles.EnableQueryNode, "query-node", false, "start as query node")
|
||||
flag.BoolVar(&roles.EnableDataService, "data-service", false, "start as data service")
|
||||
flag.BoolVar(&roles.EnableDataNode, "data-node", false, "start as data node")
|
||||
flag.BoolVar(&roles.EnableIndexService, "index-service", false, "start as index service")
|
||||
flag.BoolVar(&roles.EnableIndexNode, "index-node", false, "start as index node")
|
||||
flag.BoolVar(&roles.EnableMsgStreamService, "msg-stream", false, "start as msg stream service")
|
||||
flag.Parse()
|
||||
const (
|
||||
fileDir = "/run/milvus-distributed"
|
||||
)
|
||||
|
||||
if !roles.HasAnyRole() {
|
||||
for _, e := range os.Environ() {
|
||||
pairs := strings.SplitN(e, "=", 2)
|
||||
if len(pairs) == 2 {
|
||||
switch pairs[0] {
|
||||
case "ENABLE_MASTER":
|
||||
roles.EnableMaster = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_PROXY_SERVICE":
|
||||
roles.EnableProxyService = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_PROXY_NODE":
|
||||
roles.EnableProxyNode = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_QUERY_SERVICE":
|
||||
roles.EnableQueryService = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_QUERY_NODE":
|
||||
roles.EnableQueryNode = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_DATA_SERVICE":
|
||||
roles.EnableDataService = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_DATA_NODE":
|
||||
roles.EnableDataNode = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_INDEX_SERVICE":
|
||||
roles.EnableIndexService = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_INDEX_NODE":
|
||||
roles.EnableIndexNode = roles.EnvValue(pairs[1])
|
||||
case "ENABLE_MSGSTREAM_SERVICE":
|
||||
roles.EnableMsgStreamService = roles.EnvValue(pairs[1])
|
||||
}
|
||||
}
|
||||
func run(serverType string) error {
|
||||
fileName := serverType + ".pid"
|
||||
_, err := os.Stat(path.Join(fileDir, fileName))
|
||||
var fd *os.File
|
||||
if os.IsNotExist(err) {
|
||||
if fd, err = os.OpenFile(path.Join(fileDir, fileName), os.O_CREATE|os.O_WRONLY, 0664); err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
_ = syscall.Close(int(fd.Fd()))
|
||||
_ = os.Remove(path.Join(fileDir, fileName))
|
||||
}()
|
||||
|
||||
if err := syscall.Flock(int(fd.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err != nil {
|
||||
return err
|
||||
}
|
||||
_, _ = fd.WriteString(fmt.Sprintf("%d", os.Getpid()))
|
||||
|
||||
} else {
|
||||
return errors.Errorf("service %s is running", serverType)
|
||||
}
|
||||
|
||||
role := roles.MilvusRoles{}
|
||||
switch serverType {
|
||||
case "master":
|
||||
role.EnableMaster = true
|
||||
case "msgstream":
|
||||
role.EnableMsgStreamService = true
|
||||
case "proxyservice":
|
||||
role.EnableProxyService = true
|
||||
case "proxynode":
|
||||
role.EnableProxyNode = true
|
||||
case "queryservice":
|
||||
role.EnableQueryService = true
|
||||
case "querynode":
|
||||
role.EnableQueryNode = true
|
||||
case "dataservice":
|
||||
role.EnableDataService = true
|
||||
case "datanode":
|
||||
role.EnableDataNode = true
|
||||
case "indexservice":
|
||||
role.EnableIndexService = true
|
||||
case "indexnode":
|
||||
role.EnableIndexNode = true
|
||||
case "standalone":
|
||||
role.EnableStandalone = true
|
||||
default:
|
||||
return errors.Errorf("unknown server type = %s", serverType)
|
||||
}
|
||||
role.Run()
|
||||
return nil
|
||||
}
|
||||
|
||||
func stop(serverType string) error {
|
||||
fileName := serverType + ".pid"
|
||||
var err error
|
||||
var fd *os.File
|
||||
if fd, err = os.OpenFile(path.Join(fileDir, fileName), os.O_RDONLY, 0664); err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
_ = fd.Close()
|
||||
}()
|
||||
var pid int
|
||||
_, err = fmt.Fscanf(fd, "%d", &pid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p, err := os.FindProcess(pid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = p.Signal(syscall.SIGTERM)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
var roles roles.MilvusRoles
|
||||
initRoles(&roles)
|
||||
roles.Run()
|
||||
if len(os.Args) < 3 {
|
||||
_, _ = fmt.Fprint(os.Stderr, "usage: milvus-distributed [command] [server type] [flags]\n")
|
||||
return
|
||||
}
|
||||
command := os.Args[1]
|
||||
serverType := os.Args[2]
|
||||
flags := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
|
||||
//flags.BoolVar()
|
||||
|
||||
if err := flags.Parse(os.Args[3:]); err != nil {
|
||||
os.Exit(-1)
|
||||
}
|
||||
|
||||
if _, err := os.Stat(fileDir); os.IsNotExist(err) {
|
||||
_, _ = fmt.Fprintf(os.Stderr, "please create dirctory /run/milvus-distributed, and set owner to current user")
|
||||
os.Exit(-1)
|
||||
}
|
||||
switch command {
|
||||
case "run":
|
||||
if err := run(serverType); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
case "stop":
|
||||
if err := stop(serverType); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
default:
|
||||
_, _ = fmt.Fprintf(os.Stderr, "unknown command : %s", command)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ type MilvusRoles struct {
|
|||
EnableIndexService bool `env:"ENABLE_INDEX_SERVICE"`
|
||||
EnableIndexNode bool `env:"ENABLE_INDEX_NODE"`
|
||||
EnableMsgStreamService bool `env:"ENABLE_MSGSTREAM_SERVICE"`
|
||||
EnableStandalone bool `env:"ENABLE_STANDALONE"`
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) HasAnyRole() bool {
|
||||
|
@ -29,7 +30,7 @@ func (mr *MilvusRoles) HasAnyRole() bool {
|
|||
mr.EnableProxyService || mr.EnableProxyNode ||
|
||||
mr.EnableQueryService || mr.EnableQueryNode ||
|
||||
mr.EnableDataService || mr.EnableDataNode ||
|
||||
mr.EnableIndexService || mr.EnableIndexNode
|
||||
mr.EnableIndexService || mr.EnableIndexNode || mr.EnableStandalone
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) EnvValue(env string) bool {
|
||||
|
|
|
@ -33,6 +33,7 @@ where `command`, `server type`, and `flags` are:
|
|||
* `datanode`
|
||||
* `indexservice`
|
||||
* `indexnode`
|
||||
* `standalone`
|
||||
|
||||
`flags`: Specifies optional flags. For example, you can use the `-f` or `--config` flags to specify the configuration file.
|
||||
|
||||
|
|
Loading…
Reference in New Issue