From 51ca741a495c04314dfa86d0ea1ec5bd922fab99 Mon Sep 17 00:00:00 2001 From: yukun Date: Thu, 14 Jan 2021 09:52:03 +0800 Subject: [PATCH] Add singlenode launch main Signed-off-by: yukun --- Makefile | 4 + cmd/singlenode/main.go | 230 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 234 insertions(+) create mode 100644 cmd/singlenode/main.go diff --git a/Makefile b/Makefile index 883d600b18..aaa5497c96 100644 --- a/Makefile +++ b/Makefile @@ -99,6 +99,8 @@ build-go: build-cpp get-rocksdb @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/writenode $(PWD)/cmd/writenode/writenode.go 1>/dev/null @echo "Building binlog ..." @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/binlog $(PWD)/cmd/binlog/main.go 1>/dev/null + @echo "Building singlenode ..." + @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/singlenode $(PWD)/cmd/singlenode/main.go 1>/dev/null build-cpp: @(env bash $(PWD)/scripts/core_build.sh -f "$(CUSTOM_THIRDPARTY_PATH)") @@ -134,6 +136,7 @@ install: all @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/proxy $(GOPATH)/bin/proxy @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/writenode $(GOPATH)/bin/writenode @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/indexbuilder $(GOPATH)/bin/indexbuilder + @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/singlenode $(GOPATH)/bin/singlenode @mkdir -p $(LIBRARY_PATH) && cp -f $(PWD)/internal/core/output/lib/* $(LIBRARY_PATH) @echo "Installation successful." @@ -148,3 +151,4 @@ clean: @rm -rf $(GOPATH)/bin/querynode @rm -rf $(GOPATH)/bin/writenode @rm -rf $(GOPATH)/bin/indexbuilder + @rm -rf $(GOPATH)/bin/singlenode diff --git a/cmd/singlenode/main.go b/cmd/singlenode/main.go new file mode 100644 index 0000000000..99a0d3333b --- /dev/null +++ b/cmd/singlenode/main.go @@ -0,0 +1,230 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + "os/signal" + "runtime/pprof" + "sync" + "syscall" + "time" + + "go.uber.org/zap" + + "github.com/zilliztech/milvus-distributed/internal/indexbuilder" + "github.com/zilliztech/milvus-distributed/internal/master" + "github.com/zilliztech/milvus-distributed/internal/proxy" + "github.com/zilliztech/milvus-distributed/internal/querynode" + "github.com/zilliztech/milvus-distributed/internal/writenode" +) + +func InitMaster(cpuprofile *string, wg *sync.WaitGroup) { + defer wg.Done() + if *cpuprofile != "" { + f, err := os.Create(*cpuprofile) + if err != nil { + log.Fatal(err) + } + if err := pprof.StartCPUProfile(f); err != nil { + log.Fatal(err) + } + defer pprof.StopCPUProfile() + } + + master.Init() + + // Creates server. + ctx, cancel := context.WithCancel(context.Background()) + + svr, err := master.CreateServer(ctx) + if err != nil { + log.Print("create server failed", zap.Error(err)) + } + + if err := svr.Run(int64(master.Params.Port)); err != nil { + log.Fatal("run server failed", zap.Error(err)) + } + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + sig := <-sc + log.Print("Got signal to exit", zap.String("signal", sig.String())) + cancel() + svr.Close() +} + +func InitProxy(wg *sync.WaitGroup) { + defer wg.Done() + proxy.Init() + fmt.Println("ProxyID is", proxy.Params.ProxyID()) + ctx, cancel := context.WithCancel(context.Background()) + svr, err := proxy.CreateProxy(ctx) + if err != nil { + log.Print("create server failed", zap.Error(err)) + } + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + var sig os.Signal + go func() { + sig = <-sc + cancel() + }() + + if err := svr.Start(); err != nil { + log.Fatal("run server failed", zap.Error(err)) + } + + <-ctx.Done() + log.Print("Got signal to exit", zap.String("signal", sig.String())) + + svr.Close() + switch sig { + case syscall.SIGTERM: + exit(0) + default: + exit(1) + } +} + +func InitQueryNode(wg *sync.WaitGroup) { + defer wg.Done() + querynode.Init() + fmt.Println("QueryNodeID is", querynode.Params.QueryNodeID) + // Creates server. + ctx, cancel := context.WithCancel(context.Background()) + svr := querynode.NewQueryNode(ctx, 0) + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + var sig os.Signal + go func() { + sig = <-sc + cancel() + }() + + if err := svr.Start(); err != nil { + log.Fatal("run server failed", zap.Error(err)) + } + + <-ctx.Done() + log.Print("Got signal to exit", zap.String("signal", sig.String())) + + svr.Close() + switch sig { + case syscall.SIGTERM: + exit(0) + default: + exit(1) + } +} + +func InitIndexBuilder(wg *sync.WaitGroup) { + defer wg.Done() + indexbuilder.Init() + ctx, cancel := context.WithCancel(context.Background()) + svr, err := indexbuilder.CreateBuilder(ctx) + if err != nil { + log.Print("create server failed", zap.Error(err)) + } + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + var sig os.Signal + go func() { + sig = <-sc + cancel() + }() + + if err := svr.Start(); err != nil { + log.Fatal("run builder server failed", zap.Error(err)) + } + + <-ctx.Done() + log.Print("Got signal to exit", zap.String("signal", sig.String())) + + svr.Close() + switch sig { + case syscall.SIGTERM: + exit(0) + default: + exit(1) + } +} + +func InitWriteNode(wg *sync.WaitGroup) { + defer wg.Done() + writenode.Init() + fmt.Println("WriteNodeID is", writenode.Params.WriteNodeID) + // Creates server. + ctx, cancel := context.WithCancel(context.Background()) + svr := writenode.NewWriteNode(ctx, 111111) + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + var sig os.Signal + go func() { + sig = <-sc + cancel() + }() + + if err := svr.Start(); err != nil { + log.Fatal("run server failed", zap.Error(err)) + } + + <-ctx.Done() + log.Print("Got signal to exit", zap.String("signal", sig.String())) + + svr.Close() + switch sig { + case syscall.SIGTERM: + exit(0) + default: + exit(1) + } +} + +func main() { + var wg sync.WaitGroup + cpuprofile := flag.String("cpuprofile", "", "write cpu profile to file") + flag.Parse() + wg.Add(1) + go InitMaster(cpuprofile, &wg) + time.Sleep(time.Second * 1) + go InitProxy(&wg) + go InitQueryNode(&wg) + go InitIndexBuilder(&wg) + go InitWriteNode(&wg) + wg.Wait() +} + +func exit(code int) { + os.Exit(code) +}