mirror of https://github.com/milvus-io/milvus.git
Generate random node port for indexService
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>pull/4973/head^2
parent
76bc365181
commit
7619d968fd
4
Makefile
4
Makefile
|
@ -125,6 +125,10 @@ build-go: build-cpp
|
|||
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/binlog $(PWD)/cmd/binlog/main.go 1>/dev/null
|
||||
@echo "Building singlenode ..."
|
||||
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/singlenode $(PWD)/cmd/singlenode/main.go 1>/dev/null
|
||||
@echo "Building distributed indexservice ..."
|
||||
@mkdir -p $(INSTALL_PATH)/distributed && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/distributed/indexservice $(PWD)/cmd/distributed/indexservice/main.go 1>/dev/null
|
||||
@echo "Building distributed indexnode ..."
|
||||
@mkdir -p $(INSTALL_PATH)/distributed && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/distributed/indexnode $(PWD)/cmd/distributed/indexnode/main.go 1>/dev/null
|
||||
|
||||
build-cpp:
|
||||
@(env bash $(PWD)/scripts/core_build.sh -f "$(CUSTOM_THIRDPARTY_PATH)")
|
||||
|
|
|
@ -23,7 +23,6 @@ import (
|
|||
)
|
||||
|
||||
func main() {
|
||||
grpcindexnode.Init()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
svr, err := grpcindexnode.CreateIndexNode(ctx)
|
||||
if err != nil {
|
|
@ -23,7 +23,6 @@ import (
|
|||
)
|
||||
|
||||
func main() {
|
||||
grpcindexserver.Init()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
svr, err := grpcindexserver.CreateIndexServer(ctx)
|
||||
if err != nil {
|
|
@ -1,4 +1,4 @@
|
|||
package grpcindexnode
|
||||
package grpcindexnodeclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -7,7 +7,6 @@ import (
|
|||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
|
@ -15,32 +14,6 @@ type Client struct {
|
|||
grpcClient indexpb.IndexNodeClient
|
||||
}
|
||||
|
||||
func (c Client) Init() {
|
||||
//TODO:???
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c Client) Start() {
|
||||
//TODO:???
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c Client) Stop() {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c Client) GetComponentStates() (*internalpb2.ComponentStates, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c Client) GetTimeTickChannel() (string, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c Client) GetStatisticsChannel() (string, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c Client) BuildIndex(req *indexpb.BuildIndexCmd) (*commonpb.Status, error) {
|
||||
|
||||
ctx := context.TODO()
|
||||
|
@ -53,9 +26,9 @@ func NewClient(nodeAddress string) *Client {
|
|||
defer cancel()
|
||||
conn, err := grpc.DialContext(ctx1, nodeAddress, grpc.WithInsecure(), grpc.WithBlock())
|
||||
if err != nil {
|
||||
log.Printf("IndexNode connect to IndexService failed, error= %v", err)
|
||||
log.Printf("Connect to IndexNode failed, error= %v", err)
|
||||
}
|
||||
log.Printf("IndexNode connected to IndexService, IndexService=%s", Params.Address)
|
||||
log.Printf("Connected to IndexService, IndexService=%s", nodeAddress)
|
||||
return &Client{
|
||||
grpcClient: indexpb.NewIndexNodeClient(conn),
|
||||
}
|
|
@ -1,177 +0,0 @@
|
|||
package grpcindexnode
|
||||
|
||||
import (
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
|
||||
)
|
||||
|
||||
type ParamTable struct {
|
||||
paramtable.BaseTable
|
||||
|
||||
Address string
|
||||
Port int
|
||||
ServiceAddress string
|
||||
ServicePort int
|
||||
|
||||
NodeID int64
|
||||
|
||||
MasterAddress string
|
||||
|
||||
EtcdAddress string
|
||||
MetaRootPath string
|
||||
|
||||
MinIOAddress string
|
||||
MinIOAccessKeyID string
|
||||
MinIOSecretAccessKey string
|
||||
MinIOUseSSL bool
|
||||
MinioBucketName string
|
||||
}
|
||||
|
||||
var Params ParamTable
|
||||
|
||||
func (pt *ParamTable) Init() {
|
||||
pt.BaseTable.Init()
|
||||
pt.initAddress()
|
||||
pt.initPort()
|
||||
pt.initIndexServerAddr()
|
||||
pt.initIndexServerPort()
|
||||
pt.initEtcdAddress()
|
||||
pt.initMasterAddress()
|
||||
pt.initMetaRootPath()
|
||||
pt.initMinIOAddress()
|
||||
pt.initMinIOAccessKeyID()
|
||||
pt.initMinIOSecretAccessKey()
|
||||
pt.initMinIOUseSSL()
|
||||
pt.initMinioBucketName()
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initAddress() {
|
||||
addr, err := pt.Load("indexNode.address")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
hostName, _ := net.LookupHost(addr)
|
||||
if len(hostName) <= 0 {
|
||||
if ip := net.ParseIP(addr); ip == nil {
|
||||
panic("invalid ip indexBuilder.address")
|
||||
}
|
||||
}
|
||||
|
||||
port, err := pt.Load("indexNode.port")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = strconv.Atoi(port)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
pt.Address = addr + ":" + port
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initPort() {
|
||||
pt.Port = pt.ParseInt("indexNode.port")
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initIndexServerAddr() {
|
||||
addr, err := pt.Load("indexServer.address")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
hostName, _ := net.LookupHost(addr)
|
||||
if len(hostName) <= 0 {
|
||||
if ip := net.ParseIP(addr); ip == nil {
|
||||
panic("invalid ip indexBuilder.address")
|
||||
}
|
||||
}
|
||||
|
||||
port, err := pt.Load("indexServer.port")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = strconv.Atoi(port)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
pt.ServiceAddress = addr + ":" + port
|
||||
}
|
||||
|
||||
func (pt ParamTable) initIndexServerPort() {
|
||||
pt.ServicePort = pt.ParseInt("indexServer.port")
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initEtcdAddress() {
|
||||
addr, err := pt.Load("_EtcdAddress")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.EtcdAddress = addr
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initMetaRootPath() {
|
||||
rootPath, err := pt.Load("etcd.rootPath")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
subPath, err := pt.Load("etcd.metaSubPath")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.MetaRootPath = rootPath + "/" + subPath
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initMasterAddress() {
|
||||
ret, err := pt.Load("_MasterAddress")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.MasterAddress = ret
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initMinIOAddress() {
|
||||
ret, err := pt.Load("_MinioAddress")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.MinIOAddress = ret
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initMinIOAccessKeyID() {
|
||||
ret, err := pt.Load("minio.accessKeyID")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.MinIOAccessKeyID = ret
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initMinIOSecretAccessKey() {
|
||||
ret, err := pt.Load("minio.secretAccessKey")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.MinIOSecretAccessKey = ret
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initMinIOUseSSL() {
|
||||
ret, err := pt.Load("minio.useSSL")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.MinIOUseSSL, err = strconv.ParseBool(ret)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initMinioBucketName() {
|
||||
bucketName, err := pt.Load("minio.bucketName")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.MinioBucketName = bucketName
|
||||
}
|
|
@ -7,7 +7,7 @@ import (
|
|||
"strconv"
|
||||
"sync"
|
||||
|
||||
grpcindexservice "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice"
|
||||
serviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
|
||||
"github.com/zilliztech/milvus-distributed/internal/indexnode"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
|
||||
|
@ -19,27 +19,31 @@ type Server struct {
|
|||
|
||||
grpcServer *grpc.Server
|
||||
|
||||
indexNodeLoopCtx context.Context
|
||||
indexNodeLoopCancel func()
|
||||
indexNodeLoopWg sync.WaitGroup
|
||||
loopCtx context.Context
|
||||
loopCancel func()
|
||||
loopWg sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewGrpcServer(ctx context.Context, indexID int64) *Server {
|
||||
|
||||
func NewGrpcServer(ctx context.Context, nodeID int64) *Server {
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
return &Server{
|
||||
node: indexnode.NewIndexNode(ctx, indexID),
|
||||
loopCtx: ctx1,
|
||||
loopCancel: cancel,
|
||||
node: indexnode.NewIndexNode(ctx, nodeID),
|
||||
}
|
||||
}
|
||||
|
||||
func registerNode() error {
|
||||
|
||||
indexServiceClient := grpcindexservice.NewClient(Params.ServiceAddress)
|
||||
indexServiceClient := serviceclient.NewClient(indexnode.Params.ServiceAddress)
|
||||
|
||||
log.Printf("Registering node. IP = %s, Port = %d", indexnode.Params.NodeIP, indexnode.Params.NodePort)
|
||||
|
||||
request := &indexpb.RegisterNodeRequest{
|
||||
Base: nil,
|
||||
Address: &commonpb.Address{
|
||||
Ip: Params.Address,
|
||||
Port: int64(Params.Port),
|
||||
Ip: indexnode.Params.NodeIP,
|
||||
Port: int64(indexnode.Params.NodePort),
|
||||
},
|
||||
}
|
||||
resp, err := indexServiceClient.RegisterNode(request)
|
||||
|
@ -48,17 +52,17 @@ func registerNode() error {
|
|||
return err
|
||||
}
|
||||
|
||||
Params.NodeID = resp.InitParams.NodeID
|
||||
log.Println("Register indexNode successful with nodeID=", Params.NodeID)
|
||||
indexnode.Params.NodeID = resp.InitParams.NodeID
|
||||
log.Println("Register indexNode successful with nodeID=", indexnode.Params.NodeID)
|
||||
|
||||
err = Params.LoadFromKVPair(resp.InitParams.StartParams)
|
||||
err = indexnode.Params.LoadFromKVPair(resp.InitParams.StartParams)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Server) grpcLoop() {
|
||||
defer s.indexNodeLoopWg.Done()
|
||||
defer s.loopWg.Done()
|
||||
|
||||
lis, err := net.Listen("tcp", ":"+strconv.Itoa(Params.Port))
|
||||
lis, err := net.Listen("tcp", ":"+strconv.Itoa(indexnode.Params.NodePort))
|
||||
if err != nil {
|
||||
log.Fatalf("IndexNode grpc server fatal error=%v", err)
|
||||
}
|
||||
|
@ -68,45 +72,66 @@ func (s *Server) grpcLoop() {
|
|||
if err = s.grpcServer.Serve(lis); err != nil {
|
||||
log.Fatalf("IndexNode grpc server fatal error=%v", err)
|
||||
}
|
||||
log.Println("IndexNode grpc server starting...")
|
||||
}
|
||||
|
||||
func (s *Server) startIndexNode() error {
|
||||
s.indexNodeLoopWg.Add(1)
|
||||
s.loopWg.Add(1)
|
||||
//TODO: How to make sure that grpc server has started successfully
|
||||
go s.grpcLoop()
|
||||
|
||||
log.Println("IndexNode grpc server start successfully")
|
||||
|
||||
err := registerNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Params.Init()
|
||||
indexnode.Params.Init()
|
||||
return nil
|
||||
}
|
||||
|
||||
func Init() {
|
||||
Params.Init()
|
||||
func (s *Server) Init() {
|
||||
indexnode.Params.Init()
|
||||
|
||||
//Get native ip
|
||||
addresses, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
for _, value := range addresses {
|
||||
if ipnet, ok := value.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
|
||||
if ipnet.IP.To4() != nil {
|
||||
indexnode.Params.NodeIP = ipnet.IP.String()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//Generate random and available port
|
||||
listener, err := net.Listen("tcp", ":0")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
indexnode.Params.NodePort = listener.Addr().(*net.TCPAddr).Port
|
||||
listener.Close()
|
||||
indexnode.Params.NodeAddress = indexnode.Params.NodeIP + ":" + strconv.FormatInt(int64(indexnode.Params.NodePort), 10)
|
||||
log.Println("IndexNode init successfully, nodeAddress=", indexnode.Params.NodeAddress)
|
||||
}
|
||||
|
||||
func CreateIndexNode(ctx context.Context) (*Server, error) {
|
||||
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
s := &Server{
|
||||
indexNodeLoopCtx: ctx1,
|
||||
indexNodeLoopCancel: cancel,
|
||||
}
|
||||
|
||||
return s, nil
|
||||
return NewGrpcServer(ctx, indexnode.Params.NodeID), nil
|
||||
}
|
||||
|
||||
func (s *Server) Start() error {
|
||||
|
||||
s.Init()
|
||||
return s.startIndexNode()
|
||||
}
|
||||
|
||||
func (s *Server) Stop() {
|
||||
s.indexNodeLoopWg.Wait()
|
||||
s.loopWg.Wait()
|
||||
}
|
||||
|
||||
func (s *Server) Close() {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package grpcindexservice
|
||||
package grpcindexserviceclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -6,41 +6,14 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
grpcClient indexpb.IndexServiceClient
|
||||
}
|
||||
|
||||
func (g Client) Init() {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (g Client) Start() {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (g Client) Stop() {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (g Client) GetComponentStates() (*internalpb2.ComponentStates, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (g Client) GetTimeTickChannel() (string, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (g Client) GetStatisticsChannel() (string, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (g Client) RegisterNode(req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) {
|
||||
|
||||
ctx := context.TODO()
|
||||
|
@ -80,9 +53,9 @@ func NewClient(address string) *Client {
|
|||
defer cancel()
|
||||
conn, err := grpc.DialContext(ctx1, address, grpc.WithInsecure(), grpc.WithBlock())
|
||||
if err != nil {
|
||||
log.Printf("IndexNode connect to IndexService failed, error= %v", err)
|
||||
log.Printf("Connect to IndexService failed, error= %v", err)
|
||||
}
|
||||
log.Printf("IndexNode connected to IndexService, IndexService=%s", Params.Address)
|
||||
log.Printf("Connected to IndexService, IndexService=%s", address)
|
||||
|
||||
return &Client{
|
||||
grpcClient: indexpb.NewIndexServiceClient(conn),
|
|
@ -1,144 +0,0 @@
|
|||
package grpcindexservice
|
||||
|
||||
import (
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
|
||||
)
|
||||
|
||||
type ParamTable struct {
|
||||
paramtable.BaseTable
|
||||
|
||||
Address string
|
||||
Port int
|
||||
|
||||
NodeID int64
|
||||
|
||||
MasterAddress string
|
||||
|
||||
EtcdAddress string
|
||||
MetaRootPath string
|
||||
|
||||
MinIOAddress string
|
||||
MinIOAccessKeyID string
|
||||
MinIOSecretAccessKey string
|
||||
MinIOUseSSL bool
|
||||
MinioBucketName string
|
||||
}
|
||||
|
||||
var Params ParamTable
|
||||
|
||||
func (pt *ParamTable) Init() {
|
||||
pt.BaseTable.Init()
|
||||
pt.initAddress()
|
||||
pt.initPort()
|
||||
pt.initEtcdAddress()
|
||||
pt.initMasterAddress()
|
||||
pt.initMetaRootPath()
|
||||
pt.initMinIOAddress()
|
||||
pt.initMinIOAccessKeyID()
|
||||
pt.initMinIOSecretAccessKey()
|
||||
pt.initMinIOUseSSL()
|
||||
pt.initMinioBucketName()
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initAddress() {
|
||||
addr, err := pt.Load("indexServer.address")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
hostName, _ := net.LookupHost(addr)
|
||||
if len(hostName) <= 0 {
|
||||
if ip := net.ParseIP(addr); ip == nil {
|
||||
panic("invalid ip indexBuilder.address")
|
||||
}
|
||||
}
|
||||
|
||||
port, err := pt.Load("indexServer.port")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = strconv.Atoi(port)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
pt.Address = addr + ":" + port
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initPort() {
|
||||
pt.Port = pt.ParseInt("indexServer.port")
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initEtcdAddress() {
|
||||
addr, err := pt.Load("_EtcdAddress")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.EtcdAddress = addr
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initMetaRootPath() {
|
||||
rootPath, err := pt.Load("etcd.rootPath")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
subPath, err := pt.Load("etcd.metaSubPath")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.MetaRootPath = rootPath + "/" + subPath
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initMasterAddress() {
|
||||
ret, err := pt.Load("_MasterAddress")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.MasterAddress = ret
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initMinIOAddress() {
|
||||
ret, err := pt.Load("_MinioAddress")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.MinIOAddress = ret
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initMinIOAccessKeyID() {
|
||||
ret, err := pt.Load("minio.accessKeyID")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.MinIOAccessKeyID = ret
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initMinIOSecretAccessKey() {
|
||||
ret, err := pt.Load("minio.secretAccessKey")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.MinIOSecretAccessKey = ret
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initMinIOUseSSL() {
|
||||
ret, err := pt.Load("minio.useSSL")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.MinIOUseSSL, err = strconv.ParseBool(ret)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initMinioBucketName() {
|
||||
bucketName, err := pt.Load("minio.bucketName")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.MinioBucketName = bucketName
|
||||
}
|
|
@ -29,13 +29,11 @@ type Server struct {
|
|||
}
|
||||
|
||||
func (s *Server) Init() {
|
||||
log.Println("initing params ...")
|
||||
Params.Init()
|
||||
indexservice.Params.Init()
|
||||
}
|
||||
|
||||
func (s *Server) Start() error {
|
||||
s.Init()
|
||||
log.Println("stringing indexserver ...")
|
||||
return s.startIndexServer()
|
||||
}
|
||||
|
||||
|
@ -64,20 +62,6 @@ func (s *Server) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequ
|
|||
func (s *Server) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
|
||||
|
||||
return s.server.BuildIndex(req)
|
||||
//indexID := int64(0)
|
||||
//request := &indexpb.BuildIndexCmd{
|
||||
// IndexID: indexID,
|
||||
// Req: req,
|
||||
//}
|
||||
//
|
||||
//indexNodeClient := grpcindexnode.NewClient()
|
||||
//
|
||||
//status, err := indexNodeClient.BuildIndex(request)
|
||||
//response := &indexpb.BuildIndexResponse{
|
||||
// Status: status,
|
||||
// IndexID: indexID,
|
||||
//}
|
||||
//return response, err
|
||||
}
|
||||
|
||||
func (s *Server) GetIndexStates(ctx context.Context, req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error) {
|
||||
|
@ -95,8 +79,6 @@ func (s *Server) NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIndexNo
|
|||
return s.server.NotifyBuildIndex(nty)
|
||||
}
|
||||
|
||||
//varindex
|
||||
|
||||
func NewServer() *Server {
|
||||
|
||||
return &Server{
|
||||
|
@ -109,7 +91,7 @@ func (s *Server) grpcLoop() {
|
|||
defer s.loopWg.Done()
|
||||
|
||||
log.Println("Starting start IndexServer")
|
||||
lis, err := net.Listen("tcp", ":"+strconv.Itoa(Params.Port))
|
||||
lis, err := net.Listen("tcp", ":"+strconv.Itoa(indexservice.Params.Port))
|
||||
if err != nil {
|
||||
log.Fatalf("IndexServer grpc server fatal error=%v", err)
|
||||
}
|
||||
|
@ -121,20 +103,16 @@ func (s *Server) grpcLoop() {
|
|||
if err = s.grpcServer.Serve(lis); err != nil {
|
||||
log.Fatalf("IndexServer grpc server fatal error=%v", err)
|
||||
}
|
||||
log.Println("IndexServer grpc server starting...")
|
||||
}
|
||||
|
||||
func (s *Server) startIndexServer() error {
|
||||
s.loopWg.Add(1)
|
||||
go s.grpcLoop()
|
||||
log.Println("IndexServer grpc server start successfully")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func Init() {
|
||||
Params.Init()
|
||||
}
|
||||
|
||||
func CreateIndexServer(ctx context.Context) (*Server, error) {
|
||||
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
|
|
|
@ -10,8 +10,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/allocator"
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
"github.com/zilliztech/milvus-distributed/internal/indexservice"
|
||||
"github.com/zilliztech/milvus-distributed/internal/kv"
|
||||
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
|
||||
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
|
||||
|
@ -43,8 +41,8 @@ type IndexNode struct {
|
|||
startCallbacks []func()
|
||||
closeCallbacks []func()
|
||||
|
||||
indexNodeID int64
|
||||
serviceClient indexservice.Interface // method factory
|
||||
indexNodeID int64
|
||||
//serviceClient indexservice.Interface // method factory
|
||||
}
|
||||
|
||||
func (i *IndexNode) Init() {
|
||||
|
@ -72,58 +70,25 @@ func (i *IndexNode) GetStatisticsChannel() (string, error) {
|
|||
}
|
||||
|
||||
func (i *IndexNode) BuildIndex(req *indexpb.BuildIndexCmd) (*commonpb.Status, error) {
|
||||
//TODO: build index in index node
|
||||
ctx := context.Background()
|
||||
t := NewIndexAddTask()
|
||||
t.req = req.Req
|
||||
t.idAllocator = i.idAllocator
|
||||
t.buildQueue = i.sched.IndexBuildQueue
|
||||
t.table = i.metaTable
|
||||
t.kv = i.kv
|
||||
var cancel func()
|
||||
t.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
|
||||
defer cancel()
|
||||
|
||||
fn := func() error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return errors.New("insert timeout")
|
||||
default:
|
||||
return i.sched.IndexAddQueue.Enqueue(t)
|
||||
}
|
||||
}
|
||||
ret := &commonpb.Status{
|
||||
log.Println("Create index with indexID=", req.IndexID)
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
||||
Reason: "",
|
||||
}
|
||||
|
||||
err := fn()
|
||||
if err != nil {
|
||||
ret.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
|
||||
ret.Reason = err.Error()
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
err = t.WaitToFinish()
|
||||
if err != nil {
|
||||
ret.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
|
||||
ret.Reason = err.Error()
|
||||
return ret, nil
|
||||
}
|
||||
return ret, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func CreateIndexNode(ctx context.Context) (*IndexNode, error) {
|
||||
return &IndexNode{}, nil
|
||||
}
|
||||
|
||||
func NewIndexNode(ctx context.Context, indexID int64) *IndexNode {
|
||||
func NewIndexNode(ctx context.Context, nodeID int64) *IndexNode {
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
in := &IndexNode{
|
||||
loopCtx: ctx1,
|
||||
loopCancel: cancel,
|
||||
|
||||
indexNodeID: indexID,
|
||||
indexNodeID: nodeID,
|
||||
}
|
||||
|
||||
return in
|
||||
|
|
|
@ -2,6 +2,7 @@ package indexnode
|
|||
|
||||
import (
|
||||
"net"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
|
||||
|
@ -13,6 +14,14 @@ type ParamTable struct {
|
|||
Address string
|
||||
Port int
|
||||
|
||||
NodeAddress string
|
||||
NodeIP string
|
||||
NodePort int
|
||||
ServiceAddress string
|
||||
ServicePort int
|
||||
|
||||
NodeID int64
|
||||
|
||||
MasterAddress string
|
||||
|
||||
EtcdAddress string
|
||||
|
@ -31,6 +40,8 @@ func (pt *ParamTable) Init() {
|
|||
pt.BaseTable.Init()
|
||||
pt.initAddress()
|
||||
pt.initPort()
|
||||
pt.initIndexServerAddress()
|
||||
pt.initIndexServerPort()
|
||||
pt.initEtcdAddress()
|
||||
pt.initMasterAddress()
|
||||
pt.initMetaRootPath()
|
||||
|
@ -70,6 +81,42 @@ func (pt *ParamTable) initPort() {
|
|||
pt.Port = pt.ParseInt("indexBuilder.port")
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initIndexServerAddress() {
|
||||
//TODO: save IndexService address in paramtable kv?
|
||||
serviceAddr := os.Getenv("INDEX_SERVICE_ADDRESS")
|
||||
if serviceAddr == "" {
|
||||
addr, err := pt.Load("indexServer.address")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
hostName, _ := net.LookupHost(addr)
|
||||
if len(hostName) <= 0 {
|
||||
if ip := net.ParseIP(addr); ip == nil {
|
||||
panic("invalid ip indexServer.address")
|
||||
}
|
||||
}
|
||||
|
||||
port, err := pt.Load("indexServer.port")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = strconv.Atoi(port)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
pt.ServiceAddress = addr + ":" + port
|
||||
return
|
||||
}
|
||||
|
||||
pt.ServiceAddress = serviceAddr
|
||||
}
|
||||
|
||||
func (pt ParamTable) initIndexServerPort() {
|
||||
pt.ServicePort = pt.ParseInt("indexServer.port")
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initEtcdAddress() {
|
||||
addr, err := pt.Load("_EtcdAddress")
|
||||
if err != nil {
|
||||
|
|
|
@ -8,7 +8,9 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/allocator"
|
||||
grpcindexnodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexnode/client"
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
"github.com/zilliztech/milvus-distributed/internal/indexnode"
|
||||
"github.com/zilliztech/milvus-distributed/internal/kv"
|
||||
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
|
@ -22,7 +24,7 @@ import (
|
|||
type IndexService struct {
|
||||
// implement Service
|
||||
|
||||
//nodeClients [] .Interface
|
||||
nodeClients []indexnode.Interface
|
||||
// factory method
|
||||
loopCtx context.Context
|
||||
loopCancel func()
|
||||
|
@ -80,18 +82,8 @@ func (i *IndexService) RegisterNode(req *indexpb.RegisterNodeRequest) (*indexpb.
|
|||
log.Println("this is register indexNode func")
|
||||
i.metaTable.nodeID2Address[nodeID] = req.Address
|
||||
|
||||
//TODO: register index node params?
|
||||
var params []*commonpb.KeyValuePair
|
||||
minioAddress, err := Params.Load("minio.address")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
minioPort, err := Params.Load("minio.port")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
params = append(params, &commonpb.KeyValuePair{Key: "minio.address", Value: minioAddress})
|
||||
params = append(params, &commonpb.KeyValuePair{Key: "minio.port", Value: minioPort})
|
||||
params = append(params, &commonpb.KeyValuePair{Key: "minio.address", Value: Params.MinIOAddress})
|
||||
params = append(params, &commonpb.KeyValuePair{Key: "minio.accessKeyID", Value: Params.MinIOAccessKeyID})
|
||||
params = append(params, &commonpb.KeyValuePair{Key: "minio.secretAccessKey", Value: Params.MinIOSecretAccessKey})
|
||||
params = append(params, &commonpb.KeyValuePair{Key: "minio.useSSL", Value: strconv.FormatBool(Params.MinIOUseSSL)})
|
||||
|
@ -99,6 +91,11 @@ func (i *IndexService) RegisterNode(req *indexpb.RegisterNodeRequest) (*indexpb.
|
|||
|
||||
i.nodeNum++
|
||||
|
||||
nodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10)
|
||||
log.Println(nodeAddress)
|
||||
nodeClient := grpcindexnodeclient.NewClient(nodeAddress)
|
||||
i.nodeClients = append(i.nodeClients, nodeClient)
|
||||
|
||||
return &indexpb.RegisterNodeResponse{
|
||||
InitParams: &internalpb2.InitParams{
|
||||
NodeID: nodeID,
|
||||
|
@ -108,28 +105,20 @@ func (i *IndexService) RegisterNode(req *indexpb.RegisterNodeRequest) (*indexpb.
|
|||
}
|
||||
|
||||
func (i *IndexService) BuildIndex(req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
|
||||
//TODO: Multiple indexes will build at same time.
|
||||
//ctx := context.Background()
|
||||
//indexNodeClient := indexnode.NewIndexNode(ctx, rand.Int63n(i.nodeNum))
|
||||
//
|
||||
////TODO: Allocator index ID
|
||||
//indexID := int64(0)
|
||||
//
|
||||
//request := &indexpb.BuildIndexCmd{
|
||||
// IndexID: indexID,
|
||||
// Req: req,
|
||||
//}
|
||||
//
|
||||
//status, err := indexNodeClient.BuildIndex(request)
|
||||
//if err != nil {
|
||||
// return nil, err
|
||||
//}
|
||||
//
|
||||
//return &indexpb.BuildIndexResponse{
|
||||
// Status: status,
|
||||
// IndexID: indexID,
|
||||
//}, nil
|
||||
return nil, nil
|
||||
|
||||
//TODO: Allocator ID
|
||||
indexID := int64(0)
|
||||
log.Println("Build index, indexID = ", indexID)
|
||||
nodeClient := i.nodeClients[0]
|
||||
request := &indexpb.BuildIndexCmd{
|
||||
IndexID: indexID,
|
||||
Req: req,
|
||||
}
|
||||
status, err := nodeClient.BuildIndex(request)
|
||||
return &indexpb.BuildIndexResponse{
|
||||
Status: status,
|
||||
IndexID: indexID,
|
||||
}, err
|
||||
}
|
||||
|
||||
func (i *IndexService) GetIndexStates(req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error) {
|
||||
|
|
|
@ -13,8 +13,6 @@ type ParamTable struct {
|
|||
Address string
|
||||
Port int
|
||||
|
||||
NodeID int64
|
||||
|
||||
MasterAddress string
|
||||
|
||||
EtcdAddress string
|
||||
|
|
Loading…
Reference in New Issue