From 0a383608f393494ad402de9be4494e8059e60304 Mon Sep 17 00:00:00 2001
From: sunby <bingyi.sun@zilliz.com>
Date: Thu, 31 Dec 2020 11:20:03 +0800
Subject: [PATCH] Remove mock client in master

Signed-off-by: sunby <bingyi.sun@zilliz.com>
---
 internal/master/client.go                  |  4 +-
 internal/master/index_builder_scheduler.go |  1 +
 internal/master/index_load_scheduler.go    |  9 ++-
 internal/master/index_task.go              |  1 +
 internal/master/master.go                  | 18 +++++-
 internal/master/param_table.go             | 22 ++++++++
 internal/master/param_table_test.go        |  5 ++
 internal/util/paramtable/paramtable.go     | 64 ++++++++++++++++++++--
 8 files changed, 113 insertions(+), 11 deletions(-)

diff --git a/internal/master/client.go b/internal/master/client.go
index 34d7e73731..7c644e02ac 100644
--- a/internal/master/client.go
+++ b/internal/master/client.go
@@ -90,12 +90,12 @@ func (m *MockBuildIndexClient) GetIndexFilePaths(indexID UniqueID) ([]string, er
 }
 
 type LoadIndexClient interface {
-	LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string) error
+	LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string, indexParams map[string]string) error
 }
 
 type MockLoadIndexClient struct {
 }
 
-func (m *MockLoadIndexClient) LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string) error {
+func (m *MockLoadIndexClient) LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string, indexParams map[string]string) error {
 	return nil
 }
diff --git a/internal/master/index_builder_scheduler.go b/internal/master/index_builder_scheduler.go
index 857ffd46b0..a67d92c8aa 100644
--- a/internal/master/index_builder_scheduler.go
+++ b/internal/master/index_builder_scheduler.go
@@ -133,6 +133,7 @@ func (scheduler *IndexBuildScheduler) describe() error {
 						fieldID:        indexBuildInfo.fieldID,
 						fieldName:      fieldName,
 						indexFilePaths: filePaths,
+						indexParams:    channelInfo.indexParams,
 					}
 					// Save data to meta table
 					err = scheduler.metaTable.UpdateFieldIndexMeta(&etcdpb.FieldIndexMeta{
diff --git a/internal/master/index_load_scheduler.go b/internal/master/index_load_scheduler.go
index 3d23cfcc8e..eec832b17f 100644
--- a/internal/master/index_load_scheduler.go
+++ b/internal/master/index_load_scheduler.go
@@ -3,12 +3,15 @@ package master
 import (
 	"context"
 	"log"
+
+	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 )
 
 type IndexLoadInfo struct {
 	segmentID      UniqueID
 	fieldID        UniqueID
 	fieldName      string
+	indexParams    []*commonpb.KeyValuePair
 	indexFilePaths []string
 }
 
@@ -36,7 +39,11 @@ func NewIndexLoadScheduler(ctx context.Context, client LoadIndexClient, metaTabl
 
 func (scheduler *IndexLoadScheduler) schedule(info interface{}) error {
 	indexLoadInfo := info.(*IndexLoadInfo)
-	err := scheduler.client.LoadIndex(indexLoadInfo.indexFilePaths, indexLoadInfo.segmentID, indexLoadInfo.fieldID, indexLoadInfo.fieldName)
+	indexParams := make(map[string]string)
+	for _, kv := range indexLoadInfo.indexParams {
+		indexParams[kv.Key] = kv.Value
+	}
+	err := scheduler.client.LoadIndex(indexLoadInfo.indexFilePaths, indexLoadInfo.segmentID, indexLoadInfo.fieldID, indexLoadInfo.fieldName, indexParams)
 	//TODO: Save data to meta table
 	if err != nil {
 		return err
diff --git a/internal/master/index_task.go b/internal/master/index_task.go
index c5cdcc4942..580d8f73a6 100644
--- a/internal/master/index_task.go
+++ b/internal/master/index_task.go
@@ -68,6 +68,7 @@ func (task *createIndexTask) Execute() error {
 				fieldID:        fieldID,
 				fieldName:      task.req.FieldName,
 				indexFilePaths: indexMeta.IndexFilePaths,
+				indexParams:    indexMeta.IndexParams,
 			})
 			if err != nil {
 				return err
diff --git a/internal/master/master.go b/internal/master/master.go
index d73d8f2907..90093cc22e 100644
--- a/internal/master/master.go
+++ b/internal/master/master.go
@@ -10,6 +10,12 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/zilliztech/milvus-distributed/internal/querynode/client"
+
+	indexbuilderclient "github.com/zilliztech/milvus-distributed/internal/indexbuilder/client"
+
+	writerclient "github.com/zilliztech/milvus-distributed/internal/writenode/client"
+
 	etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
 	ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
@@ -175,9 +181,15 @@ func CreateServer(ctx context.Context) (*Master, error) {
 	m.scheduler.SetDDMsgStream(pulsarDDStream)
 	m.scheduler.SetIDAllocator(func() (UniqueID, error) { return m.idAllocator.AllocOne() })
 
-	flushClient := &MockWriteNodeClient{}
-	buildIndexClient := &MockBuildIndexClient{}
-	loadIndexClient := &MockLoadIndexClient{}
+	flushClient, err := writerclient.NewWriterClient(Params.EtcdAddress, kvRootPath, Params.WriteNodeSegKvSubPath, pulsarDDStream)
+	if err != nil {
+		return nil, err
+	}
+	buildIndexClient, err := indexbuilderclient.NewBuildIndexClient(ctx, Params.IndexBuilderAddress)
+	if err != nil {
+		return nil, err
+	}
+	loadIndexClient := client.NewLoadIndexClient(ctx, Params.PulsarAddress, Params.LoadIndexChannelNames)
 
 	m.indexLoadSch = NewIndexLoadScheduler(ctx, loadIndexClient, m.metaTable)
 	m.indexBuildSch = NewIndexBuildScheduler(ctx, buildIndexClient, m.metaTable, m.indexLoadSch)
diff --git a/internal/master/param_table.go b/internal/master/param_table.go
index 89f3a6345e..fdd2f933f9 100644
--- a/internal/master/param_table.go
+++ b/internal/master/param_table.go
@@ -21,6 +21,7 @@ type ParamTable struct {
 	KvRootPath            string
 	WriteNodeSegKvSubPath string
 	PulsarAddress         string
+	IndexBuilderAddress   string
 
 	// nodeID
 	ProxyIDList     []typeutil.UniqueID
@@ -49,6 +50,8 @@ type ParamTable struct {
 
 	MaxPartitionNum     int64
 	DefaultPartitionTag string
+
+	LoadIndexChannelNames []string
 }
 
 var Params ParamTable
@@ -71,6 +74,7 @@ func (p *ParamTable) Init() {
 	p.initKvRootPath()
 	p.initWriteNodeSegKvSubPath()
 	p.initPulsarAddress()
+	p.initIndexBuilderAddress()
 
 	p.initProxyIDList()
 	p.initWriteNodeIDList()
@@ -95,6 +99,8 @@ func (p *ParamTable) Init() {
 	p.initMsgChannelSubName()
 	p.initMaxPartitionNum()
 	p.initDefaultPartitionTag()
+
+	p.initLoadIndexChannelNames()
 }
 
 func (p *ParamTable) initAddress() {
@@ -125,6 +131,14 @@ func (p *ParamTable) initPulsarAddress() {
 	p.PulsarAddress = addr
 }
 
+func (p *ParamTable) initIndexBuilderAddress() {
+	ret, err := p.Load("_IndexBuilderAddress")
+	if err != nil {
+		panic(err)
+	}
+	p.IndexBuilderAddress = ret
+}
+
 func (p *ParamTable) initMetaRootPath() {
 	rootPath, err := p.Load("etcd.rootPath")
 	if err != nil {
@@ -346,3 +360,11 @@ func (p *ParamTable) initDefaultPartitionTag() {
 
 	p.DefaultPartitionTag = defaultTag
 }
+
+func (p *ParamTable) initLoadIndexChannelNames() {
+	loadIndexChannelName, err := p.Load("msgChannel.chanNamePrefix.cmd")
+	if err != nil {
+		panic(err)
+	}
+	p.LoadIndexChannelNames = []string{loadIndexChannelName}
+}
diff --git a/internal/master/param_table_test.go b/internal/master/param_table_test.go
index 8128c5f180..750021666c 100644
--- a/internal/master/param_table_test.go
+++ b/internal/master/param_table_test.go
@@ -31,6 +31,11 @@ func TestParamTable_KVRootPath(t *testing.T) {
 	assert.Equal(t, path, "by-dev/kv")
 }
 
+func TestParamTable_IndexBuilderAddress(t *testing.T) {
+	path := Params.IndexBuilderAddress
+	assert.Equal(t, path, "localhost:31000")
+}
+
 func TestParamTable_TopicNum(t *testing.T) {
 	num := Params.TopicNum
 	fmt.Println("TopicNum:", num)
diff --git a/internal/util/paramtable/paramtable.go b/internal/util/paramtable/paramtable.go
index ef8d7f56f0..229b47765b 100644
--- a/internal/util/paramtable/paramtable.go
+++ b/internal/util/paramtable/paramtable.go
@@ -57,19 +57,40 @@ func (gp *BaseTable) Init() {
 	if err != nil {
 		panic(err)
 	}
+	gp.tryloadFromEnv()
+
+}
+
+func (gp *BaseTable) tryloadFromEnv() {
 
 	minioAddress := os.Getenv("MINIO_ADDRESS")
 	if minioAddress == "" {
-		minioAddress = "localhost:9000"
+		minioHost, err := gp.Load("minio.address")
+		if err != nil {
+			panic(err)
+		}
+		port, err := gp.Load("minio.port")
+		if err != nil {
+			panic(err)
+		}
+		minioAddress = minioHost + ":" + port
 	}
-	err = gp.Save("_MinioAddress", minioAddress)
+	err := gp.Save("_MinioAddress", minioAddress)
 	if err != nil {
 		panic(err)
 	}
 
 	etcdAddress := os.Getenv("ETCD_ADDRESS")
 	if etcdAddress == "" {
-		etcdAddress = "localhost:2379"
+		etcdHost, err := gp.Load("etcd.address")
+		if err != nil {
+			panic(err)
+		}
+		port, err := gp.Load("etcd.port")
+		if err != nil {
+			panic(err)
+		}
+		etcdAddress = etcdHost + ":" + port
 	}
 	err = gp.Save("_EtcdAddress", etcdAddress)
 	if err != nil {
@@ -78,7 +99,15 @@ func (gp *BaseTable) Init() {
 
 	pulsarAddress := os.Getenv("PULSAR_ADDRESS")
 	if pulsarAddress == "" {
-		pulsarAddress = "pulsar://localhost:6650"
+		pulsarHost, err := gp.Load("pulsar.address")
+		if err != nil {
+			panic(err)
+		}
+		port, err := gp.Load("pulsar.port")
+		if err != nil {
+			panic(err)
+		}
+		pulsarAddress = "pulsar://" + pulsarHost + ":" + port
 	}
 	err = gp.Save("_PulsarAddress", pulsarAddress)
 	if err != nil {
@@ -87,12 +116,37 @@ func (gp *BaseTable) Init() {
 
 	masterAddress := os.Getenv("MASTER_ADDRESS")
 	if masterAddress == "" {
-		masterAddress = "localhost:53100"
+		masterHost, err := gp.Load("master.address")
+		if err != nil {
+			panic(err)
+		}
+		port, err := gp.Load("master.port")
+		if err != nil {
+			panic(err)
+		}
+		masterAddress = masterHost + ":" + port
 	}
 	err = gp.Save("_MasterAddress", masterAddress)
 	if err != nil {
 		panic(err)
 	}
+
+	indexBuilderAddress := os.Getenv("INDEX_BUILDER_ADDRESS")
+	if indexBuilderAddress == "" {
+		indexBuilderHost, err := gp.Load("indexBuilder.address")
+		if err != nil {
+			panic(err)
+		}
+		port, err := gp.Load("indexBuilder.port")
+		if err != nil {
+			panic(err)
+		}
+		indexBuilderAddress = indexBuilderHost + ":" + port
+	}
+	err = gp.Save("_IndexBuilderAddress", indexBuilderAddress)
+	if err != nil {
+		panic(err)
+	}
 }
 
 func (gp *BaseTable) Load(key string) (string, error) {