Combine standalone binary into milvus (#5866)

* rename service to coord for cmd

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update docs

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update variable name

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* optimize roles.go

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* support milvus run standalone

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* remove cmd/standalone

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* fix static-check

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename proxynode to proxy

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename service to coord for compoments

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add comments for codacy check

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* set helm chart branch

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/5869/head^2
Cai Yudong 2021-06-18 15:20:08 +08:00 committed by GitHub
parent 2875b10dc5
commit 3b0951e3e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 650 additions and 518 deletions

View File

@ -83,15 +83,11 @@ binlog:
@echo "Building binlog ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/binlog $(PWD)/cmd/binlog/main.go 1>/dev/null
standalone: build-cpp
@echo "Building Milvus standalone ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/standalone $(PWD)/cmd/standalone/main.go 1>/dev/null
milvus: build-cpp
@echo "Building Milvus distributed ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/milvus $(PWD)/cmd/distributed/main.go 1>/dev/null
@echo "Building Milvus ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/milvus $(PWD)/cmd/main.go 1>/dev/null
build-go: standalone milvus
build-go: milvus
build-cpp:
@(env bash $(PWD)/scripts/core_build.sh -f "$(CUSTOM_THIRDPARTY_PATH)")
@ -109,7 +105,6 @@ build-cpp-with-unittest:
# Runs the tests.
unittest: test-cpp test-go
#TODO: proxynode master query node writer's unittest
test-go:build-cpp
@echo "Running go unittests..."
@echo "disable go unittest for now, enable it later"
@ -130,7 +125,6 @@ docker: verifiers
# Builds each component and installs it to $GOPATH/bin.
install: all
@echo "Installing binary to './bin'"
@mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/standalone $(GOPATH)/bin/standalone
@mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/milvus $(GOPATH)/bin/milvus
@mkdir -p $(LIBRARY_PATH) && cp -f $(PWD)/internal/core/output/lib/* $(LIBRARY_PATH)
@echo "Installation successful."
@ -141,5 +135,4 @@ clean:
@find . -name '*~' | xargs rm -fv
@rm -rf bin/
@rm -rf lib/
@rm $(GOPATH)/bin/standalone
@rm $(GOPATH)/bin/milvus

View File

@ -36,6 +36,7 @@ pipeline {
DOCKER_BUILDKIT = 1
CUSTOM_THIRDPARTY_PATH = "/tmp/third_party"
ARTIFACTS = "${env.WORKSPACE}/artifacts"
MILVUS_HELM_BRANCH = "rename"
}
stages {
stage('Test') {

View File

@ -45,6 +45,7 @@ pipeline {
DOCKER_CREDENTIALS_ID = "ba070c98-c8cc-4f7c-b657-897715f359fc"
DOKCER_REGISTRY_URL = "registry.zilliz.com"
TARGET_REPO = "${DOKCER_REGISTRY_URL}/milvus"
MILVUS_HELM_BRANCH = "rename"
}
stages {
stage('Test') {

View File

@ -18,31 +18,34 @@ import (
"github.com/milvus-io/milvus/internal/msgstream"
)
type DataService struct {
type DataCoord struct {
ctx context.Context
svr *grpcdataserviceclient.Server
}
func NewDataService(ctx context.Context, factory msgstream.Factory) (*DataService, error) {
// NewDataCoord creates a new DataCoord
func NewDataCoord(ctx context.Context, factory msgstream.Factory) (*DataCoord, error) {
s, err := grpcdataserviceclient.NewServer(ctx, factory)
if err != nil {
return nil, err
}
return &DataService{
return &DataCoord{
ctx: ctx,
svr: s,
}, nil
}
func (s *DataService) Run() error {
// Run starts service
func (s *DataCoord) Run() error {
if err := s.svr.Run(); err != nil {
return err
}
return nil
}
func (s *DataService) Stop() error {
// Stop terminates service
func (s *DataCoord) Stop() error {
if err := s.svr.Stop(); err != nil {
return err
}

View File

@ -24,6 +24,7 @@ type DataNode struct {
svr *grpcdatanode.Server
}
// NewDataNode creates a new DataNode
func NewDataNode(ctx context.Context, factory msgstream.Factory) (*DataNode, error) {
svr, err := grpcdatanode.NewServer(ctx, factory)
if err != nil {
@ -36,6 +37,7 @@ func NewDataNode(ctx context.Context, factory msgstream.Factory) (*DataNode, err
}, nil
}
// Run starts service
func (d *DataNode) Run() error {
if err := d.svr.Run(); err != nil {
panic(err)
@ -44,6 +46,7 @@ func (d *DataNode) Run() error {
return nil
}
// Stop terminates service
func (d *DataNode) Stop() error {
if err := d.svr.Stop(); err != nil {
return err

View File

@ -17,13 +17,14 @@ import (
grpcindexserver "github.com/milvus-io/milvus/internal/distributed/indexservice"
)
type IndexService struct {
type IndexCoord struct {
svr *grpcindexserver.Server
}
func NewIndexService(ctx context.Context) (*IndexService, error) {
// NewIndexService creates a new IndexCoord
func NewIndexCoord(ctx context.Context) (*IndexCoord, error) {
var err error
s := &IndexService{}
s := &IndexCoord{}
svr, err := grpcindexserver.NewServer(ctx)
if err != nil {
@ -32,13 +33,17 @@ func NewIndexService(ctx context.Context) (*IndexService, error) {
s.svr = svr
return s, nil
}
func (s *IndexService) Run() error {
// Run starts service
func (s *IndexCoord) Run() error {
if err := s.svr.Run(); err != nil {
return err
}
return nil
}
func (s *IndexService) Stop() error {
// Stop terminates service
func (s *IndexCoord) Stop() error {
if err := s.svr.Stop(); err != nil {
return err
}

View File

@ -21,6 +21,7 @@ type IndexNode struct {
svr *grpcindexnode.Server
}
// NewIndexNode creates a new IndexNode
func NewIndexNode(ctx context.Context) (*IndexNode, error) {
var err error
n := &IndexNode{}
@ -32,12 +33,16 @@ func NewIndexNode(ctx context.Context) (*IndexNode, error) {
return n, nil
}
// Run starts service
func (n *IndexNode) Run() error {
if err := n.svr.Run(); err != nil {
return err
}
return nil
}
// Stop terminates service
func (n *IndexNode) Stop() error {
if err := n.svr.Stop(); err != nil {
return err

View File

@ -15,17 +15,20 @@ import (
"context"
)
func NewMsgStreamService(ctx context.Context) (*MsgStream, error) {
return nil, nil
}
type MsgStream struct {
}
// NewMsgStreamCoord nil
func NewMsgStreamCoord(ctx context.Context) (*MsgStream, error) {
return nil, nil
}
// Run nil
func (ps *MsgStream) Run() error {
return nil
}
// Stop nil
func (ps *MsgStream) Stop() error {
return nil
}

View File

@ -18,13 +18,14 @@ import (
"github.com/milvus-io/milvus/internal/msgstream"
)
type ProxyNode struct {
type Proxy struct {
svr *grpcproxynode.Server
}
func NewProxyNode(ctx context.Context, factory msgstream.Factory) (*ProxyNode, error) {
// NewProxy creates a new Proxy
func NewProxy(ctx context.Context, factory msgstream.Factory) (*Proxy, error) {
var err error
n := &ProxyNode{}
n := &Proxy{}
svr, err := grpcproxynode.NewServer(ctx, factory)
if err != nil {
@ -34,14 +35,16 @@ func NewProxyNode(ctx context.Context, factory msgstream.Factory) (*ProxyNode, e
return n, nil
}
func (n *ProxyNode) Run() error {
// Run starts service
func (n *Proxy) Run() error {
if err := n.svr.Run(); err != nil {
return err
}
return nil
}
func (n *ProxyNode) Stop() error {
// Stop terminates service
func (n *Proxy) Stop() error {
if err := n.svr.Stop(); err != nil {
return err
}

View File

@ -18,31 +18,34 @@ import (
"github.com/milvus-io/milvus/internal/msgstream"
)
type QueryService struct {
type QueryCoord struct {
ctx context.Context
svr *grpcqueryservice.Server
}
func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryService, error) {
// NewQueryCoord creates a new QueryCoord
func NewQueryCoord(ctx context.Context, factory msgstream.Factory) (*QueryCoord, error) {
svr, err := grpcqueryservice.NewServer(ctx, factory)
if err != nil {
panic(err)
}
return &QueryService{
return &QueryCoord{
ctx: ctx,
svr: svr,
}, nil
}
func (qs *QueryService) Run() error {
// Run starts service
func (qs *QueryCoord) Run() error {
if err := qs.svr.Run(); err != nil {
panic(err)
}
return nil
}
func (qs *QueryService) Stop() error {
// Stop terminates service
func (qs *QueryCoord) Stop() error {
if err := qs.svr.Stop(); err != nil {
return err
}

View File

@ -23,8 +23,8 @@ type QueryNode struct {
svr *grpcquerynode.Server
}
// NewQueryNode creates a new QueryNode
func NewQueryNode(ctx context.Context, factory msgstream.Factory) (*QueryNode, error) {
svr, err := grpcquerynode.NewServer(ctx, factory)
if err != nil {
return nil, err
@ -37,6 +37,7 @@ func NewQueryNode(ctx context.Context, factory msgstream.Factory) (*QueryNode, e
}
// Run starts service
func (q *QueryNode) Run() error {
if err := q.svr.Run(); err != nil {
panic(err)
@ -44,6 +45,7 @@ func (q *QueryNode) Run() error {
return nil
}
// Stop terminates service
func (q *QueryNode) Stop() error {
if err := q.svr.Stop(); err != nil {
return err

View File

@ -20,7 +20,7 @@ import (
"github.com/opentracing/opentracing-go"
)
type MasterService struct {
type RootCoord struct {
ctx context.Context
svr *msc.Server
@ -28,27 +28,29 @@ type MasterService struct {
closer io.Closer
}
func NewMasterService(ctx context.Context, factory msgstream.Factory) (*MasterService, error) {
// NewRootCoord creates a new RoorCoord
func NewRootCoord(ctx context.Context, factory msgstream.Factory) (*RootCoord, error) {
svr, err := msc.NewServer(ctx, factory)
if err != nil {
return nil, err
}
return &MasterService{
return &RootCoord{
ctx: ctx,
svr: svr,
}, nil
}
func (m *MasterService) Run() error {
if err := m.svr.Run(); err != nil {
// Run starts service
func (rc *RootCoord) Run() error {
if err := rc.svr.Run(); err != nil {
return err
}
return nil
}
func (m *MasterService) Stop() error {
if err := m.svr.Stop(); err != nil {
// Stop terminates service
func (rc *RootCoord) Stop() error {
if err := rc.svr.Stop(); err != nil {
return err
}
return nil

View File

@ -1,333 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package roles
import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"github.com/milvus-io/milvus/internal/datanode"
"github.com/milvus-io/milvus/internal/dataservice"
"github.com/milvus-io/milvus/internal/indexnode"
"github.com/milvus-io/milvus/internal/indexservice"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/masterservice"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proxynode"
"github.com/milvus-io/milvus/internal/querynode"
"github.com/milvus-io/milvus/internal/queryservice"
"github.com/milvus-io/milvus/cmd/distributed/components"
"github.com/milvus-io/milvus/internal/logutil"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/util/trace"
)
func newMsgFactory(localMsg bool) msgstream.Factory {
if localMsg {
return msgstream.NewRmsFactory()
}
return msgstream.NewPmsFactory()
}
type MilvusRoles struct {
EnableMaster bool `env:"ENABLE_MASTER"`
EnableProxyNode bool `env:"ENABLE_PROXY_NODE"`
EnableQueryService bool `env:"ENABLE_QUERY_SERVICE"`
EnableQueryNode bool `env:"ENABLE_QUERY_NODE"`
EnableDataService bool `env:"ENABLE_DATA_SERVICE"`
EnableDataNode bool `env:"ENABLE_DATA_NODE"`
EnableIndexService bool `env:"ENABLE_INDEX_SERVICE"`
EnableIndexNode bool `env:"ENABLE_INDEX_NODE"`
EnableMsgStreamService bool `env:"ENABLE_MSGSTREAM_SERVICE"`
}
func (mr *MilvusRoles) EnvValue(env string) bool {
env = strings.ToLower(env)
env = strings.Trim(env, " ")
return env == "1" || env == "true"
}
func (mr *MilvusRoles) Run(localMsg bool) {
if os.Getenv("DEPLOY_MODE") == "STANDALONE" {
closer := trace.InitTracing("standalone")
if closer != nil {
defer closer.Close()
}
}
ctx, cancel := context.WithCancel(context.Background())
if mr.EnableMaster {
var ms *components.MasterService
var wg sync.WaitGroup
wg.Add(1)
go func() {
masterservice.Params.Init()
logutil.SetupLogger(&masterservice.Params.Log)
defer log.Sync()
factory := newMsgFactory(localMsg)
var err error
ms, err = components.NewMasterService(ctx, factory)
if err != nil {
panic(err)
}
wg.Done()
_ = ms.Run()
}()
wg.Wait()
if ms != nil {
defer ms.Stop()
}
metrics.RegisterRootCoord()
}
if mr.EnableProxyNode {
var pn *components.ProxyNode
var wg sync.WaitGroup
wg.Add(1)
go func() {
proxynode.Params.Init()
logutil.SetupLogger(&proxynode.Params.Log)
defer log.Sync()
factory := newMsgFactory(localMsg)
var err error
pn, err = components.NewProxyNode(ctx, factory)
if err != nil {
panic(err)
}
wg.Done()
_ = pn.Run()
}()
wg.Wait()
if pn != nil {
defer pn.Stop()
}
metrics.RegisterProxyNode()
}
if mr.EnableQueryService {
var qs *components.QueryService
var wg sync.WaitGroup
wg.Add(1)
go func() {
queryservice.Params.Init()
logutil.SetupLogger(&queryservice.Params.Log)
defer log.Sync()
factory := newMsgFactory(localMsg)
var err error
qs, err = components.NewQueryService(ctx, factory)
if err != nil {
panic(err)
}
wg.Done()
_ = qs.Run()
}()
wg.Wait()
if qs != nil {
defer qs.Stop()
}
metrics.RegisterQueryCoord()
}
if mr.EnableQueryNode {
var qn *components.QueryNode
var wg sync.WaitGroup
wg.Add(1)
go func() {
querynode.Params.Init()
logutil.SetupLogger(&querynode.Params.Log)
defer log.Sync()
factory := newMsgFactory(localMsg)
var err error
qn, err = components.NewQueryNode(ctx, factory)
if err != nil {
panic(err)
}
wg.Done()
_ = qn.Run()
}()
wg.Wait()
if qn != nil {
defer qn.Stop()
}
metrics.RegisterQueryNode()
}
if mr.EnableDataService {
var ds *components.DataService
var wg sync.WaitGroup
wg.Add(1)
go func() {
dataservice.Params.Init()
logutil.SetupLogger(&dataservice.Params.Log)
defer log.Sync()
factory := newMsgFactory(localMsg)
var err error
ds, err = components.NewDataService(ctx, factory)
if err != nil {
panic(err)
}
wg.Done()
_ = ds.Run()
}()
wg.Wait()
if ds != nil {
defer ds.Stop()
}
metrics.RegisterDataCoord()
}
if mr.EnableDataNode {
var dn *components.DataNode
var wg sync.WaitGroup
wg.Add(1)
go func() {
datanode.Params.Init()
logutil.SetupLogger(&datanode.Params.Log)
defer log.Sync()
factory := newMsgFactory(localMsg)
var err error
dn, err = components.NewDataNode(ctx, factory)
if err != nil {
panic(err)
}
wg.Done()
_ = dn.Run()
}()
wg.Wait()
if dn != nil {
defer dn.Stop()
}
metrics.RegisterDataNode()
}
if mr.EnableIndexService {
var is *components.IndexService
var wg sync.WaitGroup
wg.Add(1)
go func() {
indexservice.Params.Init()
logutil.SetupLogger(&indexservice.Params.Log)
defer log.Sync()
var err error
is, err = components.NewIndexService(ctx)
if err != nil {
panic(err)
}
wg.Done()
_ = is.Run()
}()
wg.Wait()
if is != nil {
defer is.Stop()
}
metrics.RegisterIndexCoord()
}
if mr.EnableIndexNode {
var in *components.IndexNode
var wg sync.WaitGroup
wg.Add(1)
go func() {
indexnode.Params.Init()
logutil.SetupLogger(&indexnode.Params.Log)
defer log.Sync()
var err error
in, err = components.NewIndexNode(ctx)
if err != nil {
panic(err)
}
wg.Done()
_ = in.Run()
}()
wg.Wait()
if in != nil {
defer in.Stop()
}
metrics.RegisterIndexNode()
}
if mr.EnableMsgStreamService {
var mss *components.MsgStream
var wg sync.WaitGroup
wg.Add(1)
go func() {
var err error
mss, err = components.NewMsgStreamService(ctx)
if err != nil {
panic(err)
}
wg.Done()
_ = mss.Run()
}()
wg.Wait()
if mss != nil {
defer mss.Stop()
}
metrics.RegisterMsgStreamCoord()
}
metrics.ServeHTTP()
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
sig := <-sc
fmt.Printf("Get %s signal to exit\n", sig.String())
// some deferred Stop has race with context cancel
cancel()
}

View File

@ -19,27 +19,28 @@ import (
"path"
"syscall"
"github.com/milvus-io/milvus/cmd/distributed/roles"
"github.com/milvus-io/milvus/cmd/roles"
)
const (
roleMaster = "master"
roleQueryService = "queryservice"
roleIndexService = "indexservice"
roleDataService = "dataservice"
roleProxyNode = "proxynode"
roleQueryNode = "querynode"
roleIndexNode = "indexnode"
roleDataNode = "datanode"
roleMixture = "mixture"
roleRootCoord = "rootcoord"
roleQueryCoord = "querycoord"
roleIndexCoord = "indexcoord"
roleDataCoord = "datacoord"
roleProxy = "proxy"
roleQueryNode = "querynode"
roleIndexNode = "indexnode"
roleDataNode = "datanode"
roleMixture = "mixture"
roleStandalone = "standalone"
)
func getPidFileName(service string, alias string) string {
func getPidFileName(serverType string, alias string) string {
var filename string
if len(alias) != 0 {
filename = fmt.Sprintf("%s-%s.pid", service, alias)
filename = fmt.Sprintf("%s-%s.pid", serverType, alias)
} else {
filename = service + ".pid"
filename = serverType + ".pid"
}
return filename
}
@ -135,39 +136,51 @@ func main() {
var svrAlias string
flags.StringVar(&svrAlias, "alias", "", "set alias")
var enableMaster, enableQueryService, enableIndexService, enableDataService bool
flags.BoolVar(&enableMaster, roleMaster, false, "enable master")
flags.BoolVar(&enableQueryService, roleQueryService, false, "enable query service")
flags.BoolVar(&enableIndexService, roleIndexService, false, "enable index service")
flags.BoolVar(&enableDataService, roleDataService, false, "enable data service")
var enableRootCoord, enableQueryCoord, enableIndexCoord, enableDataCoord bool
flags.BoolVar(&enableRootCoord, roleRootCoord, false, "enable root coordinator")
flags.BoolVar(&enableQueryCoord, roleQueryCoord, false, "enable query coordinator")
flags.BoolVar(&enableIndexCoord, roleIndexCoord, false, "enable index coordinator")
flags.BoolVar(&enableDataCoord, roleDataCoord, false, "enable data coordinator")
if err := flags.Parse(os.Args[3:]); err != nil {
os.Exit(-1)
}
var localMsg = false
role := roles.MilvusRoles{}
switch serverType {
case roleMaster:
role.EnableMaster = true
case roleProxyNode:
role.EnableProxyNode = true
case roleQueryService:
role.EnableQueryService = true
case roleRootCoord:
role.EnableRootCoord = true
case roleProxy:
role.EnableProxy = true
case roleQueryCoord:
role.EnableQueryCoord = true
case roleQueryNode:
role.EnableQueryNode = true
case roleDataService:
role.EnableDataService = true
case roleDataCoord:
role.EnableDataCoord = true
case roleDataNode:
role.EnableDataNode = true
case roleIndexService:
role.EnableIndexService = true
case roleIndexCoord:
role.EnableIndexCoord = true
case roleIndexNode:
role.EnableIndexNode = true
case roleMixture:
role.EnableMaster = enableMaster
role.EnableQueryService = enableQueryService
role.EnableDataService = enableDataService
role.EnableIndexService = enableIndexService
role.EnableRootCoord = enableRootCoord
role.EnableQueryCoord = enableQueryCoord
role.EnableDataCoord = enableDataCoord
role.EnableIndexCoord = enableIndexCoord
case roleStandalone:
role.EnableRootCoord = true
role.EnableProxy = true
role.EnableQueryCoord = true
role.EnableQueryNode = true
role.EnableDataCoord = true
role.EnableDataNode = true
role.EnableIndexCoord = true
role.EnableIndexNode = true
role.EnableMsgStreamCoord = true
localMsg = true
default:
fmt.Fprintf(os.Stderr, "Unknown server type = %s\n", serverType)
os.Exit(-1)
@ -191,7 +204,7 @@ func main() {
panic(err)
}
defer removePidFile(fd)
role.Run(false)
role.Run(localMsg)
case "stop":
if err := stopPid(filename, runtimeDir); err != nil {
panic(err)

423
cmd/roles/roles.go Normal file
View File

@ -0,0 +1,423 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package roles
import (
"context"
"fmt"
"os"
"os/signal"
"path"
"strings"
"sync"
"syscall"
"github.com/milvus-io/milvus/cmd/components"
"github.com/milvus-io/milvus/internal/datanode"
"github.com/milvus-io/milvus/internal/dataservice"
"github.com/milvus-io/milvus/internal/indexnode"
"github.com/milvus-io/milvus/internal/indexservice"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/logutil"
"github.com/milvus-io/milvus/internal/masterservice"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proxynode"
"github.com/milvus-io/milvus/internal/querynode"
"github.com/milvus-io/milvus/internal/queryservice"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/trace"
)
func newMsgFactory(localMsg bool) msgstream.Factory {
if localMsg {
return msgstream.NewRmsFactory()
}
return msgstream.NewPmsFactory()
}
type MilvusRoles struct {
EnableRootCoord bool `env:"ENABLE_ROOT_COORD"`
EnableProxy bool `env:"ENABLE_PROXY"`
EnableQueryCoord bool `env:"ENABLE_QUERY_COORD"`
EnableQueryNode bool `env:"ENABLE_QUERY_NODE"`
EnableDataCoord bool `env:"ENABLE_DATA_COORD"`
EnableDataNode bool `env:"ENABLE_DATA_NODE"`
EnableIndexCoord bool `env:"ENABLE_INDEX_COORD"`
EnableIndexNode bool `env:"ENABLE_INDEX_NODE"`
EnableMsgStreamCoord bool `env:"ENABLE_MSGSTREAM_COORD"`
}
func (mr *MilvusRoles) EnvValue(env string) bool {
env = strings.ToLower(env)
env = strings.Trim(env, " ")
return env == "1" || env == "true"
}
func (mr *MilvusRoles) setLogConfigFilename(filename string) *log.Config {
paramtable.Params.Init()
cfg := paramtable.Params.LogConfig
if len(cfg.File.RootPath) != 0 {
cfg.File.Filename = path.Join(cfg.File.RootPath, filename)
} else {
cfg.File.Filename = ""
}
return cfg
}
func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool) *components.RootCoord {
var rc *components.RootCoord
var wg sync.WaitGroup
wg.Add(1)
go func() {
masterservice.Params.Init()
if !localMsg {
logutil.SetupLogger(&masterservice.Params.Log)
defer log.Sync()
}
factory := newMsgFactory(localMsg)
var err error
rc, err = components.NewRootCoord(ctx, factory)
if err != nil {
panic(err)
}
wg.Done()
_ = rc.Run()
}()
wg.Wait()
metrics.RegisterRootCoord()
return rc
}
func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool) *components.Proxy {
var pn *components.Proxy
var wg sync.WaitGroup
wg.Add(1)
go func() {
proxynode.Params.Init()
if !localMsg {
logutil.SetupLogger(&proxynode.Params.Log)
defer log.Sync()
}
factory := newMsgFactory(localMsg)
var err error
pn, err = components.NewProxy(ctx, factory)
if err != nil {
panic(err)
}
wg.Done()
_ = pn.Run()
}()
wg.Wait()
metrics.RegisterProxyNode()
return pn
}
func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool) *components.QueryCoord {
var qs *components.QueryCoord
var wg sync.WaitGroup
wg.Add(1)
go func() {
queryservice.Params.Init()
if !localMsg {
logutil.SetupLogger(&queryservice.Params.Log)
defer log.Sync()
}
factory := newMsgFactory(localMsg)
var err error
qs, err = components.NewQueryCoord(ctx, factory)
if err != nil {
panic(err)
}
wg.Done()
_ = qs.Run()
}()
wg.Wait()
metrics.RegisterQueryCoord()
return qs
}
func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool) *components.QueryNode {
var qn *components.QueryNode
var wg sync.WaitGroup
wg.Add(1)
go func() {
querynode.Params.Init()
if !localMsg {
logutil.SetupLogger(&querynode.Params.Log)
defer log.Sync()
}
factory := newMsgFactory(localMsg)
var err error
qn, err = components.NewQueryNode(ctx, factory)
if err != nil {
panic(err)
}
wg.Done()
_ = qn.Run()
}()
wg.Wait()
metrics.RegisterQueryNode()
return qn
}
func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool) *components.DataCoord {
var ds *components.DataCoord
var wg sync.WaitGroup
wg.Add(1)
go func() {
dataservice.Params.Init()
if !localMsg {
logutil.SetupLogger(&dataservice.Params.Log)
defer log.Sync()
}
factory := newMsgFactory(localMsg)
var err error
ds, err = components.NewDataCoord(ctx, factory)
if err != nil {
panic(err)
}
wg.Done()
_ = ds.Run()
}()
wg.Wait()
metrics.RegisterDataCoord()
return ds
}
func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool) *components.DataNode {
var dn *components.DataNode
var wg sync.WaitGroup
wg.Add(1)
go func() {
datanode.Params.Init()
if !localMsg {
logutil.SetupLogger(&datanode.Params.Log)
defer log.Sync()
}
factory := newMsgFactory(localMsg)
var err error
dn, err = components.NewDataNode(ctx, factory)
if err != nil {
panic(err)
}
wg.Done()
_ = dn.Run()
}()
wg.Wait()
metrics.RegisterDataNode()
return dn
}
func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool) *components.IndexCoord {
var is *components.IndexCoord
var wg sync.WaitGroup
wg.Add(1)
go func() {
indexservice.Params.Init()
if !localMsg {
logutil.SetupLogger(&indexservice.Params.Log)
defer log.Sync()
}
var err error
is, err = components.NewIndexCoord(ctx)
if err != nil {
panic(err)
}
wg.Done()
_ = is.Run()
}()
wg.Wait()
metrics.RegisterIndexCoord()
return is
}
func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool) *components.IndexNode {
var in *components.IndexNode
var wg sync.WaitGroup
wg.Add(1)
go func() {
indexnode.Params.Init()
if !localMsg {
logutil.SetupLogger(&indexnode.Params.Log)
defer log.Sync()
}
var err error
in, err = components.NewIndexNode(ctx)
if err != nil {
panic(err)
}
wg.Done()
_ = in.Run()
}()
wg.Wait()
metrics.RegisterIndexNode()
return in
}
func (mr *MilvusRoles) runMsgStreamCoord(ctx context.Context) *components.MsgStream {
var mss *components.MsgStream
var wg sync.WaitGroup
wg.Add(1)
go func() {
var err error
mss, err = components.NewMsgStreamCoord(ctx)
if err != nil {
panic(err)
}
wg.Done()
_ = mss.Run()
}()
wg.Wait()
metrics.RegisterMsgStreamCoord()
return mss
}
func (mr *MilvusRoles) Run(localMsg bool) {
if os.Getenv("DEPLOY_MODE") == "STANDALONE" {
closer := trace.InitTracing("standalone")
if closer != nil {
defer closer.Close()
}
}
ctx, cancel := context.WithCancel(context.Background())
// only standalone enable localMsg
if localMsg {
os.Setenv("QUERY_NODE_ID", "1")
os.Setenv("DEPLOY_MODE", "STANDALONE")
cfg := mr.setLogConfigFilename("standalone.log")
logutil.SetupLogger(cfg)
defer log.Sync()
}
var rc *components.RootCoord
if mr.EnableRootCoord {
rc = mr.runRootCoord(ctx, localMsg)
if rc != nil {
defer rc.Stop()
}
}
var pn *components.Proxy
if mr.EnableProxy {
pn = mr.runProxy(ctx, localMsg)
if pn != nil {
defer pn.Stop()
}
}
var qs *components.QueryCoord
if mr.EnableQueryCoord {
qs = mr.runQueryCoord(ctx, localMsg)
if qs != nil {
defer qs.Stop()
}
}
var qn *components.QueryNode
if mr.EnableQueryNode {
qn = mr.runQueryNode(ctx, localMsg)
if qn != nil {
defer qn.Stop()
}
}
var ds *components.DataCoord
if mr.EnableDataCoord {
ds = mr.runDataCoord(ctx, localMsg)
if ds != nil {
defer ds.Stop()
}
}
var dn *components.DataNode
if mr.EnableDataNode {
dn = mr.runDataNode(ctx, localMsg)
if dn != nil {
defer dn.Stop()
}
}
var is *components.IndexCoord
if mr.EnableIndexCoord {
is = mr.runIndexCoord(ctx, localMsg)
if is != nil {
defer is.Stop()
}
}
var in *components.IndexNode
if mr.EnableIndexNode {
in = mr.runIndexNode(ctx, localMsg)
if in != nil {
defer in.Stop()
}
}
var mss *components.MsgStream
if mr.EnableMsgStreamCoord {
mss = mr.runMsgStreamCoord(ctx)
if mss != nil {
defer mss.Stop()
}
}
metrics.ServeHTTP()
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
sig := <-sc
fmt.Printf("Get %s signal to exit\n", sig.String())
// some deferred Stop has race with context cancel
cancel()
}

View File

@ -1,63 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package main
import (
"os"
"github.com/milvus-io/milvus/cmd/distributed/roles"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/logutil"
)
func initRoles(roles *roles.MilvusRoles) {
roles.EnableMaster = true
roles.EnableProxyNode = true
roles.EnableQueryService = true
roles.EnableQueryNode = true
roles.EnableDataService = true
roles.EnableDataNode = true
roles.EnableIndexService = true
roles.EnableIndexNode = true
roles.EnableMsgStreamService = true
}
func initLogCfg() log.Config {
logCfg := log.Config{}
logCfg.Format = "text"
logCfg.Level = "debug"
logCfg.Development = true
logCfg.File.MaxSize = 300
logCfg.File.MaxBackups = 20
logCfg.File.MaxDays = 10
// FIXME(wxyu): Load from config files
logCfg.File.Filename = ""
// ciFileDir := "/milvus/logs/"
// if _, err := os.Stat(ciFileDir); err == nil {
// logCfg.File.Filename = ciFileDir + "standalone.log"
// } else {
// logCfg.File.Filename = "/tmp/milvus/standalone.log"
// }
return logCfg
}
func main() {
var roles roles.MilvusRoles
initRoles(&roles)
os.Setenv("QUERY_NODE_ID", "1")
os.Setenv("DEPLOY_MODE", "STANDALONE")
logCfg := initLogCfg()
logutil.SetupLogger(&logCfg)
roles.Run(true)
}

View File

@ -4,7 +4,8 @@ Milvus 能够通过配置文件、命令行选项、环境变量进行配置。
优先级顺序: 命令行选项 > 环境变量 > 配置文件 > 默认值
如果提供了配置文件,则其他的命令行选项和环境变量都将被忽略。例如: `milvus run master --config-file milvus.yaml --log-level debug` 将忽略 `--log-level` 选项。
如果提供了配置文件,则其他的命令行选项和环境变量都将被忽略。
例如: `milvus run rootcoord --config-file milvus.yaml --log-level debug` 将忽略 `--log-level` 选项。
### 语法
@ -17,7 +18,7 @@ $ milvus [command] [server type] [flags]
例如:
```shell
$ MILVUS_CONFIG_FILE=/path/to/milvus/configs/milvus.yaml milvus run master
$ MILVUS_CONFIG_FILE=/path/to/milvus/configs/milvus.yaml milvus run rootcoord
```
`command` `server type` `flags` 分别表示为
@ -26,13 +27,13 @@ $ MILVUS_CONFIG_FILE=/path/to/milvus/configs/milvus.yaml milvus run master
`server type`:指定执行程序的类型。`server type` 有:
* `master`
* `proxy`
* `queryservice`
* `rootcoord`
* `proxynode`
* `querycoord`
* `querynode`
* `dataservice`
* `datacoord`
* `datanode`
* `indexservice`
* `indexcoord`
* `indexnode`
* `standalone`
* `mixture`
@ -41,17 +42,17 @@ $ MILVUS_CONFIG_FILE=/path/to/milvus/configs/milvus.yaml milvus run master
`server type``mixture` 时,必须附加以下几个 `flag` 中的一个或多个,表示这几个服务在一个进程内启动
* `-master`
* `-queryservice`
* `-dataservice`
* `-indexservice`
* `-rootcoord`
* `-querycoord`
* `-datacoord`
* `-indexcoord`
> Getting help
>
> You can get help for CLI tool using the `--help` flag, or `-h` for short.
> ```shell
> $ milvus run master --help
> $ milvus run rootcoord --help
> ```
### 命令行参数
@ -87,8 +88,7 @@ $ MILVUS_CONFIG_FILE=/path/to/milvus/configs/milvus.yaml milvus run master
| 名称 | 描述 | 默认值 |
| --------- | --------- | --------- |
| etcd.address | etcd 服务地址 | "localhost" |
| etcd.port | etcd 服务端口 | 2379 |
| etcd.endpoints | etcd 服务接入端 | "localhost:2379" |
| minio.address | minio 服务地址 | "localhost" |
| minio.port | minio 服务端口 | 9000 |
| pulsar.address | pulsar 服务地址 | "localhost" |

View File

@ -26,6 +26,8 @@ const (
// FileLogConfig serializes file log related config in toml/json.
type FileLogConfig struct {
// Log rootpath
RootPath string `toml:"rootpath" json:"rootpath"`
// Log filename, leave empty to disable file log.
Filename string `toml:"filename" json:"filename"`
// Max size for a single file, in MB.

View File

@ -18,10 +18,10 @@ import (
"github.com/stretchr/testify/assert"
)
var Params = BaseTable{}
var baseParams = BaseTable{}
func TestMain(m *testing.M) {
Params.Init()
baseParams.Init()
code := m.Run()
os.Exit(code)
}
@ -29,42 +29,42 @@ func TestMain(m *testing.M) {
//func TestMain
func TestGlobalParamsTable_SaveAndLoad(t *testing.T) {
err1 := Params.Save("int", "10")
err1 := baseParams.Save("int", "10")
assert.Nil(t, err1)
err2 := Params.Save("string", "testSaveAndLoad")
err2 := baseParams.Save("string", "testSaveAndLoad")
assert.Nil(t, err2)
err3 := Params.Save("float", "1.234")
err3 := baseParams.Save("float", "1.234")
assert.Nil(t, err3)
r1, _ := Params.Load("int")
r1, _ := baseParams.Load("int")
assert.Equal(t, "10", r1)
r2, _ := Params.Load("string")
r2, _ := baseParams.Load("string")
assert.Equal(t, "testSaveAndLoad", r2)
r3, _ := Params.Load("float")
r3, _ := baseParams.Load("float")
assert.Equal(t, "1.234", r3)
err4 := Params.Remove("int")
err4 := baseParams.Remove("int")
assert.Nil(t, err4)
err5 := Params.Remove("string")
err5 := baseParams.Remove("string")
assert.Nil(t, err5)
err6 := Params.Remove("float")
err6 := baseParams.Remove("float")
assert.Nil(t, err6)
}
func TestGlobalParamsTable_LoadRange(t *testing.T) {
_ = Params.Save("xxxaab", "10")
_ = Params.Save("xxxfghz", "20")
_ = Params.Save("xxxbcde", "1.1")
_ = Params.Save("xxxabcd", "testSaveAndLoad")
_ = Params.Save("xxxzhi", "12")
_ = baseParams.Save("xxxaab", "10")
_ = baseParams.Save("xxxfghz", "20")
_ = baseParams.Save("xxxbcde", "1.1")
_ = baseParams.Save("xxxabcd", "testSaveAndLoad")
_ = baseParams.Save("xxxzhi", "12")
keys, values, err := Params.LoadRange("xxxa", "xxxg", 10)
keys, values, err := baseParams.LoadRange("xxxa", "xxxg", 10)
assert.Nil(t, err)
assert.Equal(t, 4, len(keys))
assert.Equal(t, "10", values[0])
@ -72,42 +72,42 @@ func TestGlobalParamsTable_LoadRange(t *testing.T) {
assert.Equal(t, "1.1", values[2])
assert.Equal(t, "20", values[3])
_ = Params.Remove("abc")
_ = Params.Remove("fghz")
_ = Params.Remove("bcde")
_ = Params.Remove("abcd")
_ = Params.Remove("zhi")
_ = baseParams.Remove("abc")
_ = baseParams.Remove("fghz")
_ = baseParams.Remove("bcde")
_ = baseParams.Remove("abcd")
_ = baseParams.Remove("zhi")
}
func TestGlobalParamsTable_Remove(t *testing.T) {
err1 := Params.Save("RemoveInt", "10")
err1 := baseParams.Save("RemoveInt", "10")
assert.Nil(t, err1)
err2 := Params.Save("RemoveString", "testRemove")
err2 := baseParams.Save("RemoveString", "testRemove")
assert.Nil(t, err2)
err3 := Params.Save("RemoveFloat", "1.234")
err3 := baseParams.Save("RemoveFloat", "1.234")
assert.Nil(t, err3)
err4 := Params.Remove("RemoveInt")
err4 := baseParams.Remove("RemoveInt")
assert.Nil(t, err4)
err5 := Params.Remove("RemoveString")
err5 := baseParams.Remove("RemoveString")
assert.Nil(t, err5)
err6 := Params.Remove("RemoveFloat")
err6 := baseParams.Remove("RemoveFloat")
assert.Nil(t, err6)
}
func TestGlobalParamsTable_LoadYaml(t *testing.T) {
err := Params.LoadYaml("milvus.yaml")
err := baseParams.LoadYaml("milvus.yaml")
assert.Nil(t, err)
err = Params.LoadYaml("advanced/channel.yaml")
err = baseParams.LoadYaml("advanced/channel.yaml")
assert.Nil(t, err)
_, err = Params.Load("etcd.address")
_, err = baseParams.Load("etcd.address")
assert.Nil(t, err)
_, err = Params.Load("pulsar.port")
_, err = baseParams.Load("pulsar.port")
assert.Nil(t, err)
}

View File

@ -0,0 +1,66 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package paramtable
import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/log"
)
var Params ParamTable
var once sync.Once
type ParamTable struct {
BaseTable
LogConfig *log.Config
}
func (p *ParamTable) Init() {
once.Do(func() {
p.BaseTable.Init()
p.initLogCfg()
})
}
func (p *ParamTable) initLogCfg() {
p.LogConfig = &log.Config{}
format, err := p.Load("log.format")
if err != nil {
panic(err)
}
p.LogConfig.Format = format
level, err := p.Load("log.level")
if err != nil {
panic(err)
}
p.LogConfig.Level = level
devStr, err := p.Load("log.dev")
if err != nil {
panic(err)
}
dev, err := strconv.ParseBool(devStr)
if err != nil {
panic(err)
}
p.LogConfig.Development = dev
p.LogConfig.File.MaxSize = p.ParseInt("log.file.maxSize")
p.LogConfig.File.MaxBackups = p.ParseInt("log.file.maxBackups")
p.LogConfig.File.MaxDays = p.ParseInt("log.file.maxAge")
p.LogConfig.File.RootPath, err = p.Load("log.file.rootPath")
if err != nil {
panic(err)
}
}