Remove mock client in master

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/4973/head^2
sunby 2020-12-31 11:20:03 +08:00 committed by yefu.chen
parent 248be309fd
commit 0a383608f3
8 changed files with 113 additions and 11 deletions

View File

@ -90,12 +90,12 @@ func (m *MockBuildIndexClient) GetIndexFilePaths(indexID UniqueID) ([]string, er
} }
type LoadIndexClient interface { type LoadIndexClient interface {
LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string) error LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string, indexParams map[string]string) error
} }
type MockLoadIndexClient struct { type MockLoadIndexClient struct {
} }
func (m *MockLoadIndexClient) LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string) error { func (m *MockLoadIndexClient) LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string, indexParams map[string]string) error {
return nil return nil
} }

View File

@ -133,6 +133,7 @@ func (scheduler *IndexBuildScheduler) describe() error {
fieldID: indexBuildInfo.fieldID, fieldID: indexBuildInfo.fieldID,
fieldName: fieldName, fieldName: fieldName,
indexFilePaths: filePaths, indexFilePaths: filePaths,
indexParams: channelInfo.indexParams,
} }
// Save data to meta table // Save data to meta table
err = scheduler.metaTable.UpdateFieldIndexMeta(&etcdpb.FieldIndexMeta{ err = scheduler.metaTable.UpdateFieldIndexMeta(&etcdpb.FieldIndexMeta{

View File

@ -3,12 +3,15 @@ package master
import ( import (
"context" "context"
"log" "log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
) )
type IndexLoadInfo struct { type IndexLoadInfo struct {
segmentID UniqueID segmentID UniqueID
fieldID UniqueID fieldID UniqueID
fieldName string fieldName string
indexParams []*commonpb.KeyValuePair
indexFilePaths []string indexFilePaths []string
} }
@ -36,7 +39,11 @@ func NewIndexLoadScheduler(ctx context.Context, client LoadIndexClient, metaTabl
func (scheduler *IndexLoadScheduler) schedule(info interface{}) error { func (scheduler *IndexLoadScheduler) schedule(info interface{}) error {
indexLoadInfo := info.(*IndexLoadInfo) indexLoadInfo := info.(*IndexLoadInfo)
err := scheduler.client.LoadIndex(indexLoadInfo.indexFilePaths, indexLoadInfo.segmentID, indexLoadInfo.fieldID, indexLoadInfo.fieldName) indexParams := make(map[string]string)
for _, kv := range indexLoadInfo.indexParams {
indexParams[kv.Key] = kv.Value
}
err := scheduler.client.LoadIndex(indexLoadInfo.indexFilePaths, indexLoadInfo.segmentID, indexLoadInfo.fieldID, indexLoadInfo.fieldName, indexParams)
//TODO: Save data to meta table //TODO: Save data to meta table
if err != nil { if err != nil {
return err return err

View File

@ -68,6 +68,7 @@ func (task *createIndexTask) Execute() error {
fieldID: fieldID, fieldID: fieldID,
fieldName: task.req.FieldName, fieldName: task.req.FieldName,
indexFilePaths: indexMeta.IndexFilePaths, indexFilePaths: indexMeta.IndexFilePaths,
indexParams: indexMeta.IndexParams,
}) })
if err != nil { if err != nil {
return err return err

View File

@ -10,6 +10,12 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/zilliztech/milvus-distributed/internal/querynode/client"
indexbuilderclient "github.com/zilliztech/milvus-distributed/internal/indexbuilder/client"
writerclient "github.com/zilliztech/milvus-distributed/internal/writenode/client"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream" ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
@ -175,9 +181,15 @@ func CreateServer(ctx context.Context) (*Master, error) {
m.scheduler.SetDDMsgStream(pulsarDDStream) m.scheduler.SetDDMsgStream(pulsarDDStream)
m.scheduler.SetIDAllocator(func() (UniqueID, error) { return m.idAllocator.AllocOne() }) m.scheduler.SetIDAllocator(func() (UniqueID, error) { return m.idAllocator.AllocOne() })
flushClient := &MockWriteNodeClient{} flushClient, err := writerclient.NewWriterClient(Params.EtcdAddress, kvRootPath, Params.WriteNodeSegKvSubPath, pulsarDDStream)
buildIndexClient := &MockBuildIndexClient{} if err != nil {
loadIndexClient := &MockLoadIndexClient{} return nil, err
}
buildIndexClient, err := indexbuilderclient.NewBuildIndexClient(ctx, Params.IndexBuilderAddress)
if err != nil {
return nil, err
}
loadIndexClient := client.NewLoadIndexClient(ctx, Params.PulsarAddress, Params.LoadIndexChannelNames)
m.indexLoadSch = NewIndexLoadScheduler(ctx, loadIndexClient, m.metaTable) m.indexLoadSch = NewIndexLoadScheduler(ctx, loadIndexClient, m.metaTable)
m.indexBuildSch = NewIndexBuildScheduler(ctx, buildIndexClient, m.metaTable, m.indexLoadSch) m.indexBuildSch = NewIndexBuildScheduler(ctx, buildIndexClient, m.metaTable, m.indexLoadSch)

View File

@ -21,6 +21,7 @@ type ParamTable struct {
KvRootPath string KvRootPath string
WriteNodeSegKvSubPath string WriteNodeSegKvSubPath string
PulsarAddress string PulsarAddress string
IndexBuilderAddress string
// nodeID // nodeID
ProxyIDList []typeutil.UniqueID ProxyIDList []typeutil.UniqueID
@ -49,6 +50,8 @@ type ParamTable struct {
MaxPartitionNum int64 MaxPartitionNum int64
DefaultPartitionTag string DefaultPartitionTag string
LoadIndexChannelNames []string
} }
var Params ParamTable var Params ParamTable
@ -71,6 +74,7 @@ func (p *ParamTable) Init() {
p.initKvRootPath() p.initKvRootPath()
p.initWriteNodeSegKvSubPath() p.initWriteNodeSegKvSubPath()
p.initPulsarAddress() p.initPulsarAddress()
p.initIndexBuilderAddress()
p.initProxyIDList() p.initProxyIDList()
p.initWriteNodeIDList() p.initWriteNodeIDList()
@ -95,6 +99,8 @@ func (p *ParamTable) Init() {
p.initMsgChannelSubName() p.initMsgChannelSubName()
p.initMaxPartitionNum() p.initMaxPartitionNum()
p.initDefaultPartitionTag() p.initDefaultPartitionTag()
p.initLoadIndexChannelNames()
} }
func (p *ParamTable) initAddress() { func (p *ParamTable) initAddress() {
@ -125,6 +131,14 @@ func (p *ParamTable) initPulsarAddress() {
p.PulsarAddress = addr p.PulsarAddress = addr
} }
func (p *ParamTable) initIndexBuilderAddress() {
ret, err := p.Load("_IndexBuilderAddress")
if err != nil {
panic(err)
}
p.IndexBuilderAddress = ret
}
func (p *ParamTable) initMetaRootPath() { func (p *ParamTable) initMetaRootPath() {
rootPath, err := p.Load("etcd.rootPath") rootPath, err := p.Load("etcd.rootPath")
if err != nil { if err != nil {
@ -346,3 +360,11 @@ func (p *ParamTable) initDefaultPartitionTag() {
p.DefaultPartitionTag = defaultTag p.DefaultPartitionTag = defaultTag
} }
func (p *ParamTable) initLoadIndexChannelNames() {
loadIndexChannelName, err := p.Load("msgChannel.chanNamePrefix.cmd")
if err != nil {
panic(err)
}
p.LoadIndexChannelNames = []string{loadIndexChannelName}
}

View File

@ -31,6 +31,11 @@ func TestParamTable_KVRootPath(t *testing.T) {
assert.Equal(t, path, "by-dev/kv") assert.Equal(t, path, "by-dev/kv")
} }
func TestParamTable_IndexBuilderAddress(t *testing.T) {
path := Params.IndexBuilderAddress
assert.Equal(t, path, "localhost:31000")
}
func TestParamTable_TopicNum(t *testing.T) { func TestParamTable_TopicNum(t *testing.T) {
num := Params.TopicNum num := Params.TopicNum
fmt.Println("TopicNum:", num) fmt.Println("TopicNum:", num)

View File

@ -57,19 +57,40 @@ func (gp *BaseTable) Init() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
gp.tryloadFromEnv()
}
func (gp *BaseTable) tryloadFromEnv() {
minioAddress := os.Getenv("MINIO_ADDRESS") minioAddress := os.Getenv("MINIO_ADDRESS")
if minioAddress == "" { if minioAddress == "" {
minioAddress = "localhost:9000" minioHost, err := gp.Load("minio.address")
if err != nil {
panic(err)
}
port, err := gp.Load("minio.port")
if err != nil {
panic(err)
}
minioAddress = minioHost + ":" + port
} }
err = gp.Save("_MinioAddress", minioAddress) err := gp.Save("_MinioAddress", minioAddress)
if err != nil { if err != nil {
panic(err) panic(err)
} }
etcdAddress := os.Getenv("ETCD_ADDRESS") etcdAddress := os.Getenv("ETCD_ADDRESS")
if etcdAddress == "" { if etcdAddress == "" {
etcdAddress = "localhost:2379" etcdHost, err := gp.Load("etcd.address")
if err != nil {
panic(err)
}
port, err := gp.Load("etcd.port")
if err != nil {
panic(err)
}
etcdAddress = etcdHost + ":" + port
} }
err = gp.Save("_EtcdAddress", etcdAddress) err = gp.Save("_EtcdAddress", etcdAddress)
if err != nil { if err != nil {
@ -78,7 +99,15 @@ func (gp *BaseTable) Init() {
pulsarAddress := os.Getenv("PULSAR_ADDRESS") pulsarAddress := os.Getenv("PULSAR_ADDRESS")
if pulsarAddress == "" { if pulsarAddress == "" {
pulsarAddress = "pulsar://localhost:6650" pulsarHost, err := gp.Load("pulsar.address")
if err != nil {
panic(err)
}
port, err := gp.Load("pulsar.port")
if err != nil {
panic(err)
}
pulsarAddress = "pulsar://" + pulsarHost + ":" + port
} }
err = gp.Save("_PulsarAddress", pulsarAddress) err = gp.Save("_PulsarAddress", pulsarAddress)
if err != nil { if err != nil {
@ -87,12 +116,37 @@ func (gp *BaseTable) Init() {
masterAddress := os.Getenv("MASTER_ADDRESS") masterAddress := os.Getenv("MASTER_ADDRESS")
if masterAddress == "" { if masterAddress == "" {
masterAddress = "localhost:53100" masterHost, err := gp.Load("master.address")
if err != nil {
panic(err)
}
port, err := gp.Load("master.port")
if err != nil {
panic(err)
}
masterAddress = masterHost + ":" + port
} }
err = gp.Save("_MasterAddress", masterAddress) err = gp.Save("_MasterAddress", masterAddress)
if err != nil { if err != nil {
panic(err) panic(err)
} }
indexBuilderAddress := os.Getenv("INDEX_BUILDER_ADDRESS")
if indexBuilderAddress == "" {
indexBuilderHost, err := gp.Load("indexBuilder.address")
if err != nil {
panic(err)
}
port, err := gp.Load("indexBuilder.port")
if err != nil {
panic(err)
}
indexBuilderAddress = indexBuilderHost + ":" + port
}
err = gp.Save("_IndexBuilderAddress", indexBuilderAddress)
if err != nil {
panic(err)
}
} }
func (gp *BaseTable) Load(key string) (string, error) { func (gp *BaseTable) Load(key string) (string, error) {