Fix ci: paramstable and startup logic refactor

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
pull/4973/head^2
zhenshan.cao 2021-02-23 11:40:30 +08:00 committed by yefu.chen
parent 7a7a73e89c
commit bbfcbbdd68
56 changed files with 1615 additions and 1456 deletions

View File

@ -43,8 +43,8 @@ dir ('build/docker/deploy') {
sh 'docker pull registry.zilliz.com/milvus-distributed/milvus-distributed-dev:latest || true'
sh 'docker pull ${SOURCE_REPO}/querynode:${SOURCE_TAG} || true'
sh 'docker-compose build --force-rm querynode'
sh 'docker-compose push querynode'
sh 'docker-compose build --force-rm querynode1'
sh 'docker-compose push querynode1'
sh 'docker pull registry.zilliz.com/milvus-distributed/milvus-distributed-dev:latest || true'
sh 'docker pull ${SOURCE_REPO}/datanode:${SOURCE_TAG} || true'

View File

@ -6,15 +6,15 @@ try {
dir ('build/docker/deploy') {
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} pull'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d master'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d proxyservice'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d proxynode'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d indexservice'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d indexnode'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d queryservice'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d proxyservice'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d dataservice'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} run -e QUERY_NODE_ID=1 -d querynode'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} run -e QUERY_NODE_ID=2 -d querynode'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d queryservice'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} run -e DATA_NODE_ID=3 -d datanode'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d proxynode'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} run -e QUERY_NODE_ID=1 -d querynode1'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} run -e QUERY_NODE_ID=2 -d querynode2'
}
dir ('build/docker/test') {

View File

@ -7,6 +7,10 @@ ETCD_ADDRESS=etcd:2379
MASTER_ADDRESS=master:53100
MINIO_ADDRESS=minio:9000
PROXY_NODE_HOST=proxynode
INDEX_NODE_HOST=indexnode
QUERY_NODE_HOST1=querynode1
QUERY_NODE_HOST2=querynode2
DATA_NODE_HOST=datanode
PROXY_SERVICE_ADDRESS=proxyservice:19530
INDEX_SERVICE_ADDRESS=indexservice:31000
DATA_SERVICE_ADDRESS=dataservice:13333

View File

@ -63,7 +63,7 @@ services:
networks:
- milvus
querynode:
querynode1:
image: ${TARGET_REPO}/querynode:${TARGET_TAG}
build:
context: ../../../
@ -76,6 +76,26 @@ services:
MINIO_ADDRESS: ${MINIO_ADDRESS}
DATA_SERVICE_ADDRESS: ${DATA_SERVICE_ADDRESS}
INDEX_SERVICE_ADDRESS: ${INDEX_SERVICE_ADDRESS}
QUERY_SERVICE_ADDRESS: ${QUERY_SERVICE_ADDRESS}
# QUERY_NODE_HOST: ${QUERY_NODE_HOST1}
networks:
- milvus
querynode2:
image: ${TARGET_REPO}/querynode:${TARGET_TAG}
build:
context: ../../../
dockerfile: build/docker/deploy/querynode/Dockerfile
cache_from:
- ${SOURCE_REPO}/querynode:${SOURCE_TAG}
environment:
PULSAR_ADDRESS: ${PULSAR_ADDRESS}
MASTER_ADDRESS: ${MASTER_ADDRESS}
MINIO_ADDRESS: ${MINIO_ADDRESS}
DATA_SERVICE_ADDRESS: ${DATA_SERVICE_ADDRESS}
INDEX_SERVICE_ADDRESS: ${INDEX_SERVICE_ADDRESS}
QUERY_SERVICE_ADDRESS: ${QUERY_SERVICE_ADDRESS}
# QUERY_NODE_HOST: ${QUERY_NODE_HOST2}
networks:
- milvus
@ -92,6 +112,9 @@ services:
MASTER_ADDRESS: ${MASTER_ADDRESS}
MINIO_ADDRESS: ${MINIO_ADDRESS}
DATA_SERVICE_ADDRESS: ${DATA_SERVICE_ADDRESS}
# DATA_NODE_HOST: ${DATA_NODE_HOST}
depends_on:
- "dataservice"
networks:
- milvus
@ -119,6 +142,7 @@ services:
environment:
INDEX_SERVICE_ADDRESS: ${INDEX_SERVICE_ADDRESS}
MINIO_ADDRESS: ${MINIO_ADDRESS}
INDEX_NODE_HOST: ${INDEX_NODE_HOST}
depends_on:
- "indexservice"
networks:

View File

@ -3,133 +3,31 @@ package components
import (
"context"
"log"
"time"
dn "github.com/zilliztech/milvus-distributed/internal/datanode"
ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
dnc "github.com/zilliztech/milvus-distributed/internal/distributed/datanode"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
grpcdatanode "github.com/zilliztech/milvus-distributed/internal/distributed/datanode"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
type DataNode struct {
ctx context.Context
svr *dnc.Server
masterService *msc.GrpcClient
dataService *dsc.Client
svr *grpcdatanode.Server
}
func NewDataNode(ctx context.Context, factory msgstream.Factory) (*DataNode, error) {
const retry = 10
const interval = 200
svr, err := dnc.New(ctx, factory)
svr, err := grpcdatanode.New(ctx, factory)
if err != nil {
panic(err)
}
log.Println("Datanode is", dn.Params.NodeID)
// --- Master Service Client ---
ms.Params.Init()
log.Println("Master service address:", dn.Params.MasterAddress)
log.Println("Init master service client ...")
masterClient, err := msc.NewGrpcClient(dn.Params.MasterAddress, 20*time.Second)
if err != nil {
panic(err)
}
if err = masterClient.Init(); err != nil {
panic(err)
}
if err = masterClient.Start(); err != nil {
panic(err)
}
var cnt int
for cnt = 0; cnt < retry; cnt++ {
time.Sleep(time.Duration(cnt*interval) * time.Millisecond)
if cnt != 0 {
log.Println("Master service isn't ready ...")
log.Printf("Retrying getting master service's states in ... %v ms", interval)
}
msStates, err := masterClient.GetComponentStates()
if err != nil {
continue
}
if msStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
continue
}
if msStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if cnt >= retry {
panic("Master service isn't ready")
}
if err := svr.SetMasterServiceInterface(masterClient); err != nil {
panic(err)
}
// --- Data Service Client ---
log.Println("Data service address: ", dn.Params.ServiceAddress)
log.Println("Init data service client ...")
dataService := dsc.NewClient(dn.Params.ServiceAddress)
if err = dataService.Init(); err != nil {
panic(err)
}
if err = dataService.Start(); err != nil {
panic(err)
}
for cnt = 0; cnt < retry; cnt++ {
dsStates, err := dataService.GetComponentStates()
if err != nil {
continue
}
if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
continue
}
if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if cnt >= retry {
panic("Data service isn't ready")
}
if err := svr.SetDataServiceInterface(dataService); err != nil {
panic(err)
return nil, err
}
return &DataNode{
ctx: ctx,
svr: svr,
dataService: dataService,
masterService: masterClient,
}, nil
}
func (d *DataNode) Run() error {
if err := d.svr.Init(); err != nil {
panic(err)
}
if err := d.svr.Start(); err != nil {
if err := d.svr.Run(); err != nil {
panic(err)
}
log.Println("Data node successfully started ...")
@ -137,7 +35,8 @@ func (d *DataNode) Run() error {
}
func (d *DataNode) Stop() error {
_ = d.dataService.Stop()
_ = d.masterService.Stop()
return d.svr.Stop()
if err := d.svr.Stop(); err != nil {
return err
}
return nil
}

View File

@ -2,81 +2,38 @@ package components
import (
"context"
"errors"
"log"
"time"
ms "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
grpcdataserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
)
type DataService struct {
ctx context.Context
server *dataservice.Service
masterClient *ms.GrpcClient
ctx context.Context
svr *grpcdataserviceclient.Server
}
func NewDataService(ctx context.Context, factory msgstream.Factory) (*DataService, error) {
service := dataservice.NewGrpcService(ctx, factory)
dataservice.Params.Init()
client, err := ms.NewGrpcClient(dataservice.Params.MasterAddress, 30*time.Second)
s, err := grpcdataserviceclient.NewServer(ctx, factory)
if err != nil {
return nil, err
}
log.Println("master client create complete")
if err = client.Init(); err != nil {
return nil, err
}
if err = client.Start(); err != nil {
return nil, err
}
ticker := time.NewTicker(500 * time.Millisecond)
tctx, tcancel := context.WithTimeout(ctx, 30*time.Second)
defer func() {
ticker.Stop()
tcancel()
}()
for {
var states *internalpb2.ComponentStates
select {
case <-ticker.C:
states, err = client.GetComponentStates()
if err != nil {
continue
}
case <-tctx.Done():
return nil, errors.New("master client connect timeout")
}
if states.State.StateCode == internalpb2.StateCode_INITIALIZING || states.State.StateCode == internalpb2.StateCode_HEALTHY {
break
}
}
service.SetMasterClient(client)
return &DataService{
ctx: ctx,
server: service,
masterClient: client,
ctx: ctx,
svr: s,
}, nil
}
func (s *DataService) Run() error {
if err := s.server.Init(); err != nil {
return err
}
if err := s.server.Start(); err != nil {
if err := s.svr.Run(); err != nil {
return err
}
return nil
}
func (s *DataService) Stop() error {
_ = s.masterClient.Stop()
_ = s.server.Stop()
if err := s.svr.Stop(); err != nil {
return err
}
return nil
}

View File

@ -2,170 +2,39 @@ package components
import (
"context"
"fmt"
"log"
"time"
ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
ps "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice"
psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client"
qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
is "github.com/zilliztech/milvus-distributed/internal/indexservice"
ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
qs "github.com/zilliztech/milvus-distributed/internal/queryservice"
)
type MasterService struct {
ctx context.Context
svr *msc.GrpcServer
proxyService *psc.Client
dataService *dsc.Client
indexService *isc.Client
queryService *qsc.Client
svr *msc.Server
}
func NewMasterService(ctx context.Context, factory msgstream.Factory) (*MasterService, error) {
const reTryCnt = 3
svr, err := msc.NewGrpcServer(ctx, factory)
svr, err := msc.NewServer(ctx, factory)
if err != nil {
return nil, err
}
log.Printf("master service address : %s:%d", ms.Params.Address, ms.Params.Port)
cnt := 0
ps.Params.Init()
log.Printf("proxy service address : %s", ps.Params.ServiceAddress)
proxyService := psc.NewClient(ps.Params.ServiceAddress)
if err = proxyService.Init(); err != nil {
panic(err)
}
for cnt = 0; cnt < reTryCnt; cnt++ {
pxStates, err := proxyService.GetComponentStates()
if err != nil {
log.Printf("get state from proxy service, retry count = %d, error = %s", cnt, err.Error())
continue
}
if pxStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Printf("get state from proxy service, retry count = %d, error = %s", cnt, pxStates.Status.Reason)
continue
}
if pxStates.State.StateCode != internalpb2.StateCode_INITIALIZING && pxStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if err = svr.SetProxyService(proxyService); err != nil {
panic(err)
}
ds.Params.Init()
log.Printf("data service address : %s:%d", ds.Params.Address, ds.Params.Port)
dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port))
if err = dataService.Init(); err != nil {
panic(err)
}
if err = dataService.Start(); err != nil {
panic(err)
}
for cnt = 0; cnt < reTryCnt; cnt++ {
dsStates, err := dataService.GetComponentStates()
if err != nil {
log.Printf("retry cout = %d, error = %s", cnt, err.Error())
continue
}
if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Printf("retry cout = %d, error = %s", cnt, dsStates.Status.Reason)
continue
}
if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if cnt >= reTryCnt {
panic("connect to data service failed")
}
if err = svr.SetDataService(dataService); err != nil {
panic(err)
}
is.Params.Init()
log.Printf("index service address : %s", is.Params.Address)
indexService := isc.NewClient(is.Params.Address)
if err = indexService.Init(); err != nil {
return nil, err
}
if err = svr.SetIndexService(indexService); err != nil {
return nil, err
}
qs.Params.Init()
queryService, err := qsc.NewClient(qs.Params.Address, time.Duration(ms.Params.Timeout)*time.Second)
if err != nil {
return nil, err
}
if err = queryService.Init(); err != nil {
return nil, err
}
if err = queryService.Start(); err != nil {
return nil, err
}
if err = svr.SetQueryService(queryService); err != nil {
return nil, err
}
return &MasterService{
ctx: ctx,
svr: svr,
proxyService: proxyService,
dataService: dataService,
indexService: indexService,
queryService: queryService,
}, nil
}
func (m *MasterService) Run() error {
if err := m.svr.Init(); err != nil {
return err
}
if err := m.svr.Start(); err != nil {
if err := m.svr.Run(); err != nil {
return err
}
return nil
}
func (m *MasterService) Stop() error {
if m != nil {
if m.proxyService != nil {
_ = m.proxyService.Stop()
}
if m.indexService != nil {
_ = m.indexService.Stop()
}
if m.dataService != nil {
_ = m.dataService.Stop()
}
if m.queryService != nil {
_ = m.queryService.Stop()
}
if m.svr != nil {
return m.svr.Stop()
}
if err := m.svr.Stop(); err != nil {
return err
}
return nil
}

View File

@ -2,243 +2,33 @@ package components
import (
"context"
"errors"
"fmt"
"log"
"time"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
qns "github.com/zilliztech/milvus-distributed/internal/distributed/querynode"
qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
grpcquerynode "github.com/zilliztech/milvus-distributed/internal/distributed/querynode"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
is "github.com/zilliztech/milvus-distributed/internal/indexservice"
ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
qs "github.com/zilliztech/milvus-distributed/internal/queryservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
type QueryNode struct {
ctx context.Context
svr *qns.Server
dataService *dsc.Client
masterService *msc.GrpcClient
indexService *isc.Client
queryService *qsc.Client
svr *grpcquerynode.Server
}
func NewQueryNode(ctx context.Context, factory msgstream.Factory) (*QueryNode, error) {
const retry = 10
const interval = 500
svr, err := qns.NewServer(ctx, factory)
svr, err := grpcquerynode.NewServer(ctx, factory)
if err != nil {
panic(err)
}
// --- QueryService ---
qs.Params.Init()
log.Println("QueryService address:", qs.Params.Address)
log.Println("Init Query service client ...")
queryService, err := qsc.NewClient(qs.Params.Address, 20*time.Second)
if err != nil {
panic(err)
}
if err = queryService.Init(); err != nil {
panic(err)
}
if err = queryService.Start(); err != nil {
panic(err)
}
var cnt int
for cnt = 0; cnt < retry; cnt++ {
if cnt != 0 {
log.Println("Query service isn't ready ...")
log.Printf("Retrying getting query service's states in ... %v ms", interval)
}
qsStates, err := queryService.GetComponentStates()
if err != nil {
continue
}
if qsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
continue
}
if qsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && qsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if cnt >= retry {
panic("Query service isn't ready")
}
if err := svr.SetQueryService(queryService); err != nil {
panic(err)
}
// --- Master Service Client ---
ms.Params.Init()
addr := fmt.Sprintf("%s:%d", ms.Params.Address, ms.Params.Port)
log.Println("Master service address:", addr)
log.Println("Init master service client ...")
var masterService *msc.GrpcClient = nil
if QueryMock {
svr.SetMasterService(&qns.MasterServiceMock{Count: 0})
} else {
masterService, err = msc.NewGrpcClient(addr, 20*time.Second)
if err != nil {
panic(err)
}
if err = masterService.Init(); err != nil {
panic(err)
}
if err = masterService.Start(); err != nil {
panic(err)
}
ticker := time.NewTicker(interval * time.Millisecond)
tctx, tcancel := context.WithTimeout(ctx, 10*interval*time.Millisecond)
defer func() {
ticker.Stop()
tcancel()
}()
for {
var states *internalpb2.ComponentStates
select {
case <-ticker.C:
states, err = masterService.GetComponentStates()
if err != nil {
continue
}
case <-tctx.Done():
return nil, errors.New("master client connect timeout")
}
if states.State.StateCode == internalpb2.StateCode_HEALTHY {
break
}
}
if err := svr.SetMasterService(masterService); err != nil {
panic(err)
}
}
// --- IndexService ---
is.Params.Init()
log.Println("Index service address:", is.Params.Address)
var indexService *isc.Client = nil
if QueryMock {
svr.SetIndexService(&qns.IndexServiceMock{Count: 0})
} else {
indexService = isc.NewClient(is.Params.Address)
if err := indexService.Init(); err != nil {
panic(err)
}
if err := indexService.Start(); err != nil {
panic(err)
}
ticker := time.NewTicker(interval * time.Millisecond)
tctx, tcancel := context.WithTimeout(ctx, 10*interval*time.Millisecond)
defer func() {
ticker.Stop()
tcancel()
}()
for {
var states *internalpb2.ComponentStates
select {
case <-ticker.C:
states, err = indexService.GetComponentStates()
if err != nil {
continue
}
case <-tctx.Done():
return nil, errors.New("Index service client connect timeout")
}
if states.State.StateCode == internalpb2.StateCode_HEALTHY {
break
}
}
if err := svr.SetIndexService(indexService); err != nil {
panic(err)
}
}
// --- DataService ---
ds.Params.Init()
log.Printf("Data service address: %s:%d", ds.Params.Address, ds.Params.Port)
log.Println("Init data service client ...")
var dataService *dsc.Client = nil
if QueryMock {
svr.SetDataService(&qns.DataServiceMock{Count: 0})
} else {
dataService = dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port))
if err = dataService.Init(); err != nil {
panic(err)
}
if err = dataService.Start(); err != nil {
panic(err)
}
for cnt = 0; cnt < retry; cnt++ {
dsStates, err := dataService.GetComponentStates()
if err != nil {
log.Printf("retry cout = %d, error = %s", cnt, err.Error())
continue
}
if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Printf("retry cout = %d, error = %s", cnt, err.Error())
continue
}
if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if cnt >= retry {
panic("Data service isn't ready")
}
if err := svr.SetDataService(dataService); err != nil {
panic(err)
}
return nil, err
}
return &QueryNode{
ctx: ctx,
svr: svr,
dataService: dataService,
masterService: masterService,
indexService: indexService,
queryService: queryService,
}, nil
}
func (q *QueryNode) Run() error {
if err := q.svr.Init(); err != nil {
panic(err)
}
if err := q.svr.Start(); err != nil {
if err := q.svr.Run(); err != nil {
panic(err)
}
log.Println("Query node successfully started ...")
@ -246,11 +36,8 @@ func (q *QueryNode) Run() error {
}
func (q *QueryNode) Stop() error {
if !QueryMock {
_ = q.dataService.Stop()
_ = q.masterService.Stop()
_ = q.indexService.Stop()
if err := q.svr.Stop(); err != nil {
return err
}
_ = q.queryService.Stop()
return q.svr.Stop()
return nil
}

View File

@ -2,161 +2,40 @@ package components
import (
"context"
"fmt"
"log"
"time"
ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
qs "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice"
ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
grpcqueryservice "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/queryservice"
)
type QueryService struct {
ctx context.Context
svr *qs.Server
dataService *dsc.Client
masterService *msc.GrpcClient
svr *grpcqueryservice.Server
}
const (
QueryMock = false
)
func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryService, error) {
const retry = 10
const interval = 200
queryservice.Params.Init()
svr, err := qs.NewServer(ctx, factory)
svr, err := grpcqueryservice.NewServer(ctx, factory)
if err != nil {
panic(err)
}
log.Println("Queryservice id is", queryservice.Params.QueryServiceID)
// --- Master Service Client ---
ms.Params.Init()
log.Printf("Master service address: %s:%d", ms.Params.Address, ms.Params.Port)
log.Println("Init master service client ...")
var masterService *msc.GrpcClient = nil
if QueryMock {
masterMock := queryservice.NewMasterMock()
svr.SetMasterService(masterMock)
} else {
masterService, err = msc.NewGrpcClient(fmt.Sprintf("%s:%d", ms.Params.Address, ms.Params.Port), 20*time.Second)
if err != nil {
panic(err)
}
if err = masterService.Init(); err != nil {
panic(err)
}
if err = masterService.Start(); err != nil {
panic(err)
}
var cnt int
for cnt = 0; cnt < retry; cnt++ {
time.Sleep(time.Duration(cnt*interval) * time.Millisecond)
if cnt != 0 {
log.Println("Master service isn't ready ...")
log.Printf("Retrying getting master service's states in ... %v ms", interval)
}
msStates, err := masterService.GetComponentStates()
if err != nil {
continue
}
if msStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
continue
}
if msStates.State.StateCode != internalpb2.StateCode_HEALTHY && msStates.State.StateCode != internalpb2.StateCode_INITIALIZING {
continue
}
break
}
if cnt >= retry {
panic("Master service isn't ready")
}
if err := svr.SetMasterService(masterService); err != nil {
panic(err)
}
}
// --- Data service client ---
ds.Params.Init()
log.Printf("Data service address: %s:%d", ds.Params.Address, ds.Params.Port)
log.Println("Init data service client ...")
var dataService *dsc.Client = nil
if QueryMock {
dataMock := queryservice.NewDataMock()
svr.SetDataService(dataMock)
} else {
dataService = dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port))
if err = dataService.Init(); err != nil {
panic(err)
}
if err = dataService.Start(); err != nil {
panic(err)
}
var cnt int
for cnt = 0; cnt < retry; cnt++ {
dsStates, err := dataService.GetComponentStates()
if err != nil {
continue
}
if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
continue
}
if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if cnt >= retry {
panic("Data service isn't ready")
}
if err := svr.SetDataService(dataService); err != nil {
panic(err)
}
}
return &QueryService{
ctx: ctx,
svr: svr,
dataService: dataService,
masterService: masterService,
ctx: ctx,
svr: svr,
}, nil
}
func (qs *QueryService) Run() error {
if err := qs.svr.Init(); err != nil {
if err := qs.svr.Run(); err != nil {
panic(err)
}
if err := qs.svr.Start(); err != nil {
panic(err)
}
log.Println("Data node successfully started ...")
log.Println("QueryService successfully started ...")
return nil
}
func (qs *QueryService) Stop() error {
if !QueryMock {
_ = qs.dataService.Stop()
_ = qs.masterService.Stop()
if err := qs.svr.Stop(); err != nil {
return err
}
return qs.svr.Stop()
return nil
}

1
go.sum
View File

@ -297,6 +297,7 @@ github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ=
github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/ozonru/etcd v3.3.20-grpc1.27-origmodule+incompatible h1:CAG0PUvo1fen+ZEfxKJjFIc8GuuN5RuaBuCAuaP2Hno=
github.com/ozonru/etcd v3.3.20-grpc1.27-origmodule+incompatible/go.mod h1:iIubILNIN6Jq9h8uiSLrN9L1tuj3iSSFwz3R61skm/A=

View File

@ -78,12 +78,10 @@ type (
func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
Params.Init()
ctx2, cancel2 := context.WithCancel(ctx)
node := &DataNode{
ctx: ctx2,
cancel: cancel2,
NodeID: Params.NodeID, // GOOSE TODO: How to init
Role: typeutil.DataNodeRole,
watchDm: make(chan struct{}),
@ -94,9 +92,7 @@ func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
replica: nil,
msFactory: factory,
}
node.State.Store(internalpb2.StateCode_INITIALIZING)
node.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
return node
}
@ -130,7 +126,7 @@ func (node *DataNode) Init() error {
},
Address: &commonpb.Address{
Ip: Params.IP,
Port: Params.Port,
Port: int64(Params.Port),
},
}
@ -181,10 +177,14 @@ func (node *DataNode) Init() error {
func (node *DataNode) Start() error {
node.metaService.init()
go node.dataSyncService.start()
node.State.Store(internalpb2.StateCode_HEALTHY)
node.UpdateStateCode(internalpb2.StateCode_HEALTHY)
return nil
}
func (node *DataNode) UpdateStateCode(code internalpb2.StateCode) {
node.State.Store(code)
}
func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,

View File

@ -18,8 +18,8 @@ type ParamTable struct {
// === DataNode Internal Components Configs ===
NodeID UniqueID
IP string // GOOSE TODO load from config file
Port int64
IP string
Port int
FlowGraphMaxQueueLength int32
FlowGraphMaxParallelism int32
FlushInsertBufferSize int32
@ -29,8 +29,8 @@ type ParamTable struct {
// === DataNode External Components Configs ===
// --- External Client Address ---
MasterAddress string
ServiceAddress string // GOOSE TODO: init from config file
//MasterAddress string
//ServiceAddress string // GOOSE TODO: init from config file
// --- Pulsar ---
PulsarAddress string
@ -80,8 +80,6 @@ func (p *ParamTable) Init() {
// === DataNode Internal Components Configs ===
p.initNodeID()
p.initIP()
p.initPort()
p.initFlowGraphMaxQueueLength()
p.initFlowGraphMaxParallelism()
p.initFlushInsertBufferSize()
@ -90,10 +88,6 @@ func (p *ParamTable) Init() {
p.initDdBinlogRootPath()
// === DataNode External Components Configs ===
// --- Master ---
p.initMasterAddress()
p.initServiceAddress()
// --- Pulsar ---
p.initPulsarAddress()
@ -140,19 +134,6 @@ func (p *ParamTable) initNodeID() {
p.NodeID = p.ParseInt64("_dataNodeID")
}
func (p *ParamTable) initIP() {
addr, err := p.Load("dataNode.address")
if err != nil {
panic(err)
}
p.IP = addr
}
func (p *ParamTable) initPort() {
port := p.ParseInt64("dataNode.port")
p.Port = port
}
// ---- flowgraph configs ----
func (p *ParamTable) initFlowGraphMaxQueueLength() {
p.FlowGraphMaxQueueLength = p.ParseInt32("dataNode.dataSync.flowGraph.maxQueueLength")
@ -189,29 +170,6 @@ func (p *ParamTable) initDdBinlogRootPath() {
p.DdBinlogRootPath = path.Join(rootPath, "data_definition_log")
}
// ===== DataNode External components configs ====
// ---- Master ----
func (p *ParamTable) initMasterAddress() {
addr, err := p.Load("_MasterAddress")
if err != nil {
panic(err)
}
p.MasterAddress = addr
}
func (p *ParamTable) initServiceAddress() {
addr, err := p.Load("dataService.address")
if err != nil {
panic(err)
}
port, err := p.Load("dataService.port")
if err != nil {
panic(err)
}
p.ServiceAddress = addr + ":" + port
}
// ---- Pulsar ----
func (p *ParamTable) initPulsarAddress() {
url, err := p.Load("_PulsarAddress")

View File

@ -44,11 +44,6 @@ func TestParamTable_DataNode(t *testing.T) {
log.Println("DdBinlogRootPath:", path)
})
t.Run("Test MasterAddress", func(t *testing.T) {
address := Params.MasterAddress
log.Println("MasterAddress:", address)
})
t.Run("Test PulsarAddress", func(t *testing.T) {
address := Params.PulsarAddress
log.Println("PulsarAddress:", address)

View File

@ -13,9 +13,7 @@ import (
type ParamTable struct {
paramtable.BaseTable
Address string
Port int
NodeID int64
NodeID int64
EtcdAddress string
MetaRootPath string
@ -54,8 +52,6 @@ func (p *ParamTable) Init() {
}
// set members
p.initAddress()
p.initPort()
p.initNodeID()
p.initEtcdAddress()
@ -80,18 +76,6 @@ func (p *ParamTable) Init() {
})
}
func (p *ParamTable) initAddress() {
dataserviceAddress, err := p.Load("dataservice.address")
if err != nil {
panic(err)
}
p.Address = dataserviceAddress
}
func (p *ParamTable) initPort() {
p.Port = p.ParseInt("dataservice.port")
}
func (p *ParamTable) initNodeID() {
p.NodeID = p.ParseInt64("dataservice.nodeID")
}

View File

@ -9,12 +9,12 @@ import (
"sync/atomic"
"time"
grpcdatanodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/datanode/client"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/distributed/datanode"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
@ -109,7 +109,7 @@ func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, erro
msFactory: factory,
}
s.insertChannels = s.getInsertChannels()
s.state.Store(internalpb2.StateCode_INITIALIZING)
s.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
return s, nil
}
@ -158,11 +158,15 @@ func (s *Server) Start() error {
return err
}
s.startServerLoop()
s.state.Store(internalpb2.StateCode_HEALTHY)
s.UpdateStateCode(internalpb2.StateCode_HEALTHY)
log.Debug("start success")
return nil
}
func (s *Server) UpdateStateCode(code internalpb2.StateCode) {
s.state.Store(code)
}
func (s *Server) checkStateIsHealthy() bool {
return s.state.Load().(internalpb2.StateCode) == internalpb2.StateCode_HEALTHY
}
@ -466,11 +470,14 @@ func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.Register
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
},
}
log.Info("DataService: RegisterNode:", zap.String("IP", req.Address.Ip), zap.Int64("Port", req.Address.Port))
node, err := s.newDataNode(req.Address.Ip, req.Address.Port, req.Base.SourceID)
if err != nil {
return nil, err
}
s.cluster.Register(node)
if s.ddChannelName == "" {
resp, err := s.masterClient.GetDdChannel()
if err = VerifyResponse(resp, err); err != nil {
@ -493,10 +500,11 @@ func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.Register
}
func (s *Server) newDataNode(ip string, port int64, id UniqueID) (*dataNode, error) {
client := datanode.NewClient(fmt.Sprintf("%s:%d", ip, port))
client := grpcdatanodeclient.NewClient(fmt.Sprintf("%s:%d", ip, port))
if err := client.Init(); err != nil {
return nil, err
}
if err := client.Start(); err != nil {
return nil, err
}

View File

@ -1,21 +1,18 @@
package datanode
package grpcdatanodeclient
import (
"context"
"log"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"google.golang.org/grpc"
)
const (
RPCConnectionTimeout = 30 * time.Second
Retry = 3
)
type Client struct {
ctx context.Context
grpc datapb.DataNodeClient
@ -26,18 +23,23 @@ type Client struct {
func NewClient(address string) *Client {
return &Client{
address: address,
ctx: context.Background(),
}
}
func (c *Client) Init() error {
ctx, cancel := context.WithTimeout(context.Background(), RPCConnectionTimeout)
defer cancel()
var err error
for i := 0; i < Retry; i++ {
if c.conn, err = grpc.DialContext(ctx, c.address, grpc.WithInsecure(), grpc.WithBlock()); err == nil {
break
connectGrpcFunc := func() error {
log.Println("DataNode connect czs::", c.address)
conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return err
}
c.conn = conn
return nil
}
err := retry.Retry(100, time.Millisecond*200, connectGrpcFunc)
if err != nil {
return err
}

View File

@ -0,0 +1,64 @@
package grpcdatanode
import (
"os"
"sync"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)
var Params ParamTable
var once sync.Once
type ParamTable struct {
paramtable.BaseTable
IP string
Port int
MasterAddress string
DataServiceAddress string
}
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initMasterAddress()
pt.initDataServiceAddress()
pt.initPort() // todo random generate
})
}
func (pt *ParamTable) LoadFromArgs() {
}
func (pt *ParamTable) LoadFromEnv() {
Params.IP = funcutil.GetLocalIP()
host := os.Getenv("DATA_NODE_HOST")
if len(host) > 0 {
Params.IP = host
}
}
func (pt *ParamTable) initPort() {
port := pt.ParseInt("dataNode.port")
pt.Port = port
}
func (pt *ParamTable) initMasterAddress() {
ret, err := pt.Load("_MasterAddress")
if err != nil {
panic(err)
}
pt.MasterAddress = ret
}
func (pt *ParamTable) initDataServiceAddress() {
ret, err := pt.Load("_DataServiceAddress")
if err != nil {
panic(err)
}
pt.DataServiceAddress = ret
}

View File

@ -0,0 +1,21 @@
package grpcdatanode
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestParamTable(t *testing.T) {
Params.Init()
assert.NotEqual(t, Params.Port, 0)
t.Logf("DataNode Port:%d", Params.Port)
assert.NotEqual(t, Params.DataServiceAddress, "")
t.Logf("DataServiceAddress:%s", Params.DataServiceAddress)
assert.NotEqual(t, Params.MasterAddress, "")
t.Logf("MasterAddress:%s", Params.MasterAddress)
}

View File

@ -1,12 +1,19 @@
package datanode
package grpcdatanode
import (
"context"
"sync"
"time"
"log"
"net"
"strconv"
"sync"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
dn "github.com/zilliztech/milvus-distributed/internal/datanode"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -17,86 +24,183 @@ import (
)
type Server struct {
core *dn.DataNode
grpcServer *grpc.Server
grpcError error
grpcErrMux sync.Mutex
ctx context.Context
cancel context.CancelFunc
impl *dn.DataNode
wg sync.WaitGroup
grpcErrChan chan error
grpcServer *grpc.Server
ctx context.Context
cancel context.CancelFunc
msFactory msgstream.Factory
masterService *msc.GrpcClient
dataService *dsc.Client
}
func New(ctx context.Context, factory msgstream.Factory) (*Server, error) {
ctx1, cancel := context.WithCancel(ctx)
var s = &Server{
ctx: ctx1,
cancel: cancel,
msFactory: factory,
ctx: ctx1,
cancel: cancel,
msFactory: factory,
grpcErrChan: make(chan error),
}
s.core = dn.NewDataNode(s.ctx, s.msFactory)
s.grpcServer = grpc.NewServer()
datapb.RegisterDataNodeServer(s.grpcServer, s)
addr := dn.Params.IP + ":" + strconv.FormatInt(dn.Params.Port, 10)
lis, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
s.impl = dn.NewDataNode(s.ctx, s.msFactory)
go func() {
if err = s.grpcServer.Serve(lis); err != nil {
s.grpcErrMux.Lock()
defer s.grpcErrMux.Unlock()
s.grpcError = err
}
}()
s.grpcErrMux.Lock()
err = s.grpcError
s.grpcErrMux.Unlock()
if err != nil {
return nil, err
}
return s, nil
}
func (s *Server) startGrpcLoop(grpcPort int) {
defer s.wg.Done()
addr := ":" + strconv.Itoa(grpcPort)
lis, err := net.Listen("tcp", addr)
if err != nil {
log.Printf("DataNode GrpcServer:failed to listen: %v", err)
s.grpcErrChan <- err
return
}
log.Println("DataNode:: addr:", addr)
s.grpcServer = grpc.NewServer()
datapb.RegisterDataNodeServer(s.grpcServer, s)
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(lis); err != nil {
log.Println("DataNode Start Grpc Failed!!!!")
s.grpcErrChan <- err
}
}
func (s *Server) SetMasterServiceInterface(ms dn.MasterServiceInterface) error {
return s.core.SetMasterServiceInterface(ms)
return s.impl.SetMasterServiceInterface(ms)
}
func (s *Server) SetDataServiceInterface(ds dn.DataServiceInterface) error {
return s.core.SetDataServiceInterface(ds)
return s.impl.SetDataServiceInterface(ds)
}
func (s *Server) Init() error {
return s.core.Init()
}
func (s *Server) Run() error {
func (s *Server) Start() error {
return s.core.Start()
if err := s.init(); err != nil {
return err
}
log.Println("data node init done ...")
if err := s.start(); err != nil {
return err
}
log.Println("data node start done ...")
return nil
}
func (s *Server) Stop() error {
err := s.core.Stop()
s.cancel()
s.grpcServer.GracefulStop()
return err
var err error
if s.grpcServer != nil {
s.grpcServer.GracefulStop()
}
err = s.impl.Stop()
if err != nil {
return err
}
s.wg.Wait()
return nil
}
func (s *Server) init() error {
Params.Init()
Params.Port = funcutil.GetAvailablePort()
Params.LoadFromEnv()
Params.LoadFromArgs()
log.Println("DataNode, port:", Params.Port)
s.wg.Add(1)
go s.startGrpcLoop(Params.Port)
// wait for grpc server loop start
err := <-s.grpcErrChan
if err != nil {
return err
}
// --- Master Server Client ---
log.Println("Master service address:", Params.MasterAddress)
log.Println("Init master service client ...")
masterClient, err := msc.NewClient(Params.MasterAddress, 20*time.Second)
if err != nil {
panic(err)
}
if err = masterClient.Init(); err != nil {
panic(err)
}
if err = masterClient.Start(); err != nil {
panic(err)
}
err = funcutil.WaitForComponentHealthy(masterClient, "MasterService", 100, time.Millisecond*200)
if err != nil {
panic(err)
}
if err := s.SetMasterServiceInterface(masterClient); err != nil {
panic(err)
}
// --- Data Server Client ---
log.Println("Data service address: ", Params.DataServiceAddress)
log.Println("DataNode Init data service client ...")
dataService := dsc.NewClient(Params.DataServiceAddress)
if err = dataService.Init(); err != nil {
panic(err)
}
if err = dataService.Start(); err != nil {
panic(err)
}
err = funcutil.WaitForComponentInitOrHealthy(dataService, "DataService", 100, time.Millisecond*200)
if err != nil {
panic(err)
}
if err := s.SetDataServiceInterface(dataService); err != nil {
panic(err)
}
dn.Params.Init()
dn.Params.Port = Params.Port
dn.Params.IP = Params.IP
s.impl.NodeID = dn.Params.NodeID
s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
if err := s.impl.Init(); err != nil {
log.Println("impl init error: ", err)
return err
}
return nil
}
func (s *Server) start() error {
return s.impl.Start()
}
func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
return s.core.GetComponentStates()
return s.impl.GetComponentStates()
}
func (s *Server) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) {
return s.core.WatchDmChannels(in)
return s.impl.WatchDmChannels(in)
}
func (s *Server) FlushSegments(ctx context.Context, in *datapb.FlushSegRequest) (*commonpb.Status, error) {
if s.core.State.Load().(internalpb2.StateCode) != internalpb2.StateCode_HEALTHY {
if s.impl.State.Load().(internalpb2.StateCode) != internalpb2.StateCode_HEALTHY {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "DataNode isn't healthy.",
@ -104,5 +208,5 @@ func (s *Server) FlushSegments(ctx context.Context, in *datapb.FlushSegRequest)
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, s.core.FlushSegments(in)
}, s.impl.FlushSegments(in)
}

View File

@ -1,10 +1,11 @@
package dataservice
package grpcdataserviceclient
import (
"context"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"google.golang.org/grpc"
@ -14,36 +15,36 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
const (
timeout = 30 * time.Second
retry = 3
)
type Client struct {
grpcClient datapb.DataServiceClient
conn *grpc.ClientConn
ctx context.Context
addr string
}
func NewClient(addr string) *Client {
return &Client{
addr: addr,
ctx: context.Background(),
}
}
func (c *Client) Init() error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
var err error
for i := 0; i < retry; i++ {
if c.conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock()); err == nil {
break
connectGrpcFunc := func() error {
conn, err := grpc.DialContext(c.ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return err
}
c.conn = conn
return nil
}
err := retry.Retry(100, time.Millisecond*200, connectGrpcFunc)
if err != nil {
return err
}
c.grpcClient = datapb.NewDataServiceClient(c.conn)
return nil
}

View File

@ -1,138 +0,0 @@
package dataservice
import (
"context"
"fmt"
"log"
"net"
"time"
"google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/dataservice"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
type Service struct {
server *dataservice.Server
ctx context.Context
grpcServer *grpc.Server
}
func (s *Service) GetSegmentInfo(ctx context.Context, request *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) {
return s.server.GetSegmentInfo(request)
}
func NewGrpcService(ctx context.Context, factory msgstream.Factory) *Service {
s := &Service{}
var err error
s.ctx = ctx
s.server, err = dataservice.CreateServer(s.ctx, factory)
if err != nil {
log.Fatalf("create server error: %s", err.Error())
return nil
}
return s
}
func (s *Service) SetMasterClient(masterClient dataservice.MasterClient) {
s.server.SetMasterClient(masterClient)
}
func (s *Service) Init() error {
var err error
s.grpcServer = grpc.NewServer()
datapb.RegisterDataServiceServer(s.grpcServer, s)
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", dataservice.Params.Address, dataservice.Params.Port))
if err != nil {
return nil
}
c := make(chan struct{})
go func() {
if err2 := s.grpcServer.Serve(lis); err2 != nil {
close(c)
err = err2
}
}()
timer := time.NewTimer(1 * time.Second)
defer timer.Stop()
select {
case <-timer.C:
break
case <-c:
return err
}
return s.server.Init()
}
func (s *Service) Start() error {
return s.server.Start()
}
func (s *Service) Stop() error {
err := s.server.Stop()
s.grpcServer.GracefulStop()
return err
}
func (s *Service) RegisterNode(ctx context.Context, request *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
return s.server.RegisterNode(request)
}
func (s *Service) Flush(ctx context.Context, request *datapb.FlushRequest) (*commonpb.Status, error) {
return s.server.Flush(request)
}
func (s *Service) AssignSegmentID(ctx context.Context, request *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error) {
return s.server.AssignSegmentID(request)
}
func (s *Service) ShowSegments(ctx context.Context, request *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error) {
return s.server.ShowSegments(request)
}
func (s *Service) GetSegmentStates(ctx context.Context, request *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) {
return s.server.GetSegmentStates(request)
}
func (s *Service) GetInsertBinlogPaths(ctx context.Context, request *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) {
return s.server.GetInsertBinlogPaths(request)
}
func (s *Service) GetInsertChannels(ctx context.Context, request *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
return s.server.GetInsertChannels(request)
}
func (s *Service) GetCollectionStatistics(ctx context.Context, request *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) {
return s.server.GetCollectionStatistics(request)
}
func (s *Service) GetPartitionStatistics(ctx context.Context, request *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error) {
return s.server.GetPartitionStatistics(request)
}
func (s *Service) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
return s.server.GetComponentStates()
}
func (s *Service) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
return s.server.GetTimeTickChannel()
}
func (s *Service) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
return s.server.GetStatisticsChannel()
}
func (s *Service) GetSegmentInfoChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
return s.server.GetSegmentInfoChannel()
}
func (s *Service) GetCount(ctx context.Context, request *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) {
return s.server.GetCount(request)
}

View File

@ -1,7 +1,6 @@
package dataservice
package grpcdataserviceclient
import (
"os"
"sync"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
@ -10,6 +9,7 @@ import (
type ParamTable struct {
paramtable.BaseTable
Port int
MasterAddress string
}
@ -19,6 +19,7 @@ var once sync.Once
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initPort()
pt.initParams()
pt.LoadFromEnv()
})
@ -29,20 +30,17 @@ func (pt *ParamTable) initParams() {
}
func (pt *ParamTable) LoadFromEnv() {
masterAddress := os.Getenv("MASTER_ADDRESS")
if masterAddress != "" {
pt.MasterAddress = masterAddress
}
}
func (pt *ParamTable) initPort() {
pt.Port = pt.ParseInt("dataservice.port")
}
func (pt *ParamTable) initMasterAddress() {
masterHost, err := pt.Load("master.address")
ret, err := pt.Load("_MasterAddress")
if err != nil {
panic(err)
}
port, err := pt.Load("master.port")
if err != nil {
panic(err)
}
pt.MasterAddress = masterHost + ":" + port
pt.MasterAddress = ret
}

View File

@ -0,0 +1,212 @@
package grpcdataserviceclient
import (
"context"
"log"
"net"
"strconv"
"sync"
"time"
"google.golang.org/grpc"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client"
"github.com/zilliztech/milvus-distributed/internal/dataservice"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
)
type Server struct {
ctx context.Context
cancel context.CancelFunc
grpcErrChan chan error
wg sync.WaitGroup
impl *dataservice.Server
grpcServer *grpc.Server
masterClient *msc.GrpcClient
}
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
ctx1, cancel := context.WithCancel(ctx)
s := &Server{
ctx: ctx1,
cancel: cancel,
grpcErrChan: make(chan error),
}
var err error
s.impl, err = dataservice.CreateServer(s.ctx, factory)
if err != nil {
return nil, err
}
return s, nil
}
func (s *Server) init() error {
Params.Init()
Params.LoadFromEnv()
s.wg.Add(1)
go s.startGrpcLoop(Params.Port)
// wait for grpc server loop start
if err := <-s.grpcErrChan; err != nil {
return err
}
log.Println("DataService:: MasterServicAddr:", Params.MasterAddress)
client, err := msc.NewClient(Params.MasterAddress, 10*time.Second)
if err != nil {
panic(err)
}
log.Println("master client create complete")
if err = client.Init(); err != nil {
panic(err)
}
if err = client.Start(); err != nil {
panic(err)
}
s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
err = funcutil.WaitForComponentInitOrHealthy(client, "MasterService", 100, time.Millisecond*200)
if err != nil {
panic(err)
}
s.impl.SetMasterClient(client)
dataservice.Params.Init()
if err := s.impl.Init(); err != nil {
log.Println("impl init error: ", err)
return err
}
return nil
}
func (s *Server) startGrpcLoop(grpcPort int) {
defer s.wg.Done()
log.Println("network port: ", grpcPort)
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
if err != nil {
log.Printf("GrpcServer:failed to listen: %v", err)
s.grpcErrChan <- err
return
}
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
s.grpcServer = grpc.NewServer()
datapb.RegisterDataServiceServer(s.grpcServer, s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(lis); err != nil {
s.grpcErrChan <- err
}
}
func (s *Server) start() error {
return s.impl.Start()
}
func (s *Server) Stop() error {
s.cancel()
var err error
if s.grpcServer != nil {
s.grpcServer.GracefulStop()
}
err = s.impl.Stop()
if err != nil {
return err
}
s.wg.Wait()
return nil
}
func (s *Server) Run() error {
if err := s.init(); err != nil {
return err
}
log.Println("dataservice init done ...")
if err := s.start(); err != nil {
return err
}
return nil
}
func (s *Server) GetSegmentInfo(ctx context.Context, request *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) {
return s.impl.GetSegmentInfo(request)
}
func (s *Server) RegisterNode(ctx context.Context, request *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
return s.impl.RegisterNode(request)
}
func (s *Server) Flush(ctx context.Context, request *datapb.FlushRequest) (*commonpb.Status, error) {
return s.impl.Flush(request)
}
func (s *Server) AssignSegmentID(ctx context.Context, request *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error) {
return s.impl.AssignSegmentID(request)
}
func (s *Server) ShowSegments(ctx context.Context, request *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error) {
return s.impl.ShowSegments(request)
}
func (s *Server) GetSegmentStates(ctx context.Context, request *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) {
return s.impl.GetSegmentStates(request)
}
func (s *Server) GetInsertBinlogPaths(ctx context.Context, request *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) {
return s.impl.GetInsertBinlogPaths(request)
}
func (s *Server) GetInsertChannels(ctx context.Context, request *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
return s.impl.GetInsertChannels(request)
}
func (s *Server) GetCollectionStatistics(ctx context.Context, request *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) {
return s.impl.GetCollectionStatistics(request)
}
func (s *Server) GetPartitionStatistics(ctx context.Context, request *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error) {
return s.impl.GetPartitionStatistics(request)
}
func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
return s.impl.GetComponentStates()
}
func (s *Server) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
return s.impl.GetTimeTickChannel()
}
func (s *Server) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
return s.impl.GetStatisticsChannel()
}
func (s *Server) GetSegmentInfoChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
return s.impl.GetSegmentInfoChannel()
}
func (s *Server) GetCount(ctx context.Context, request *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) {
return s.impl.GetCount(request)
}

View File

@ -30,12 +30,10 @@ func (pt *ParamTable) Init() {
})
}
// todo
func (pt *ParamTable) LoadFromArgs() {
}
//todo
func (pt *ParamTable) LoadFromEnv() {
indexServiceAddress := os.Getenv("INDEX_SERVICE_ADDRESS")
if indexServiceAddress != "" {
@ -43,7 +41,7 @@ func (pt *ParamTable) LoadFromEnv() {
}
Params.IP = funcutil.GetLocalIP()
host := os.Getenv("PROXY_NODE_HOST")
host := os.Getenv("INDEX_NODE_HOST")
if len(host) > 0 {
Params.IP = host
}

View File

@ -1,11 +1,10 @@
package masterservice
package grpcmasterserviceclient
import (
"context"
"time"
"github.com/zilliztech/milvus-distributed/internal/errors"
cms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
@ -19,18 +18,20 @@ type GrpcClient struct {
conn *grpc.ClientConn
//inner member
addr string
timeout time.Duration
retry int
addr string
timeout time.Duration
grpcTimeout time.Duration
retry int
}
func NewGrpcClient(addr string, timeout time.Duration) (*GrpcClient, error) {
func NewClient(addr string, timeout time.Duration) (*GrpcClient, error) {
return &GrpcClient{
grpcClient: nil,
conn: nil,
addr: addr,
timeout: timeout,
retry: 3,
grpcClient: nil,
conn: nil,
addr: addr,
timeout: timeout,
grpcTimeout: time.Second * 5,
retry: 3,
}, nil
}
@ -47,7 +48,6 @@ func (c *GrpcClient) Init() error {
return err
}
c.grpcClient = masterpb.NewMasterServiceClient(c.conn)
cms.Params.Init()
return nil
}
@ -59,98 +59,98 @@ func (c *GrpcClient) Stop() error {
}
func (c *GrpcClient) GetComponentStates() (*internalpb2.ComponentStates, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
return c.grpcClient.GetComponentStatesRPC(ctx, &commonpb.Empty{})
}
//DDL request
func (c *GrpcClient) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
return c.grpcClient.CreateCollection(ctx, in)
}
func (c *GrpcClient) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
return c.grpcClient.DropCollection(ctx, in)
}
func (c *GrpcClient) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
return c.grpcClient.HasCollection(ctx, in)
}
func (c *GrpcClient) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
return c.grpcClient.DescribeCollection(ctx, in)
}
func (c *GrpcClient) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
return c.grpcClient.ShowCollections(ctx, in)
}
func (c *GrpcClient) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
return c.grpcClient.CreatePartition(ctx, in)
}
func (c *GrpcClient) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
return c.grpcClient.DropPartition(ctx, in)
}
func (c *GrpcClient) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
return c.grpcClient.HasPartition(ctx, in)
}
func (c *GrpcClient) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
return c.grpcClient.ShowPartitions(ctx, in)
}
//index builder service
func (c *GrpcClient) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
return c.grpcClient.CreateIndex(ctx, in)
}
func (c *GrpcClient) DropIndex(in *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
return c.grpcClient.DropIndex(ctx, in)
}
func (c *GrpcClient) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
return c.grpcClient.DescribeIndex(ctx, in)
}
//global timestamp allocator
func (c *GrpcClient) AllocTimestamp(in *masterpb.TsoRequest) (*masterpb.TsoResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
return c.grpcClient.AllocTimestamp(ctx, in)
}
func (c *GrpcClient) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
return c.grpcClient.AllocID(ctx, in)
}
//receiver time tick from proxy service, and put it into this channel
func (c *GrpcClient) GetTimeTickChannel() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
rsp, err := c.grpcClient.GetTimeTickChannelRPC(ctx, &commonpb.Empty{})
if err != nil {
@ -164,7 +164,7 @@ func (c *GrpcClient) GetTimeTickChannel() (string, error) {
//receive ddl from rpc and time tick from proxy service, and put them into this channel
func (c *GrpcClient) GetDdChannel() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
rsp, err := c.grpcClient.GetDdChannelRPC(ctx, &commonpb.Empty{})
if err != nil {
@ -178,7 +178,7 @@ func (c *GrpcClient) GetDdChannel() (string, error) {
//just define a channel, not used currently
func (c *GrpcClient) GetStatisticsChannel() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
rsp, err := c.grpcClient.GetStatisticsChannelRPC(ctx, &commonpb.Empty{})
if err != nil {
@ -191,13 +191,13 @@ func (c *GrpcClient) GetStatisticsChannel() (string, error) {
}
func (c *GrpcClient) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
return c.grpcClient.DescribeSegment(ctx, in)
}
func (c *GrpcClient) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout)
defer cancel()
return c.grpcClient.ShowSegments(ctx, in)
}

View File

@ -1,14 +1,18 @@
package masterservice
package grpcmasterservice
import (
"context"
"fmt"
"math/rand"
"regexp"
"strconv"
"strings"
"sync"
"testing"
"time"
grpcmasterserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
cms "github.com/zilliztech/milvus-distributed/internal/masterservice"
@ -25,16 +29,23 @@ func TestGrpcService(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
//cms.Params.Address = "127.0.0.1"
cms.Params.Port = (randVal % 100) + 10000
Params.Init()
Params.Port = (randVal % 100) + 10000
parts := strings.Split(Params.Address, ":")
if len(parts) == 2 {
Params.Address = parts[0] + ":" + strconv.Itoa(Params.Port)
t.Log("newParams.Address:", Params.Address)
}
msFactory := pulsarms.NewFactory()
svr, err := NewGrpcServer(context.Background(), msFactory)
svr, err := NewServer(context.Background(), msFactory)
assert.Nil(t, err)
svr.connectQueryService = false
svr.connectProxyService = false
svr.connectIndexService = false
svr.connectDataService = false
// cms.Params.NodeID = 0
//cms.Params.PulsarAddress = "pulsar://127.0.0.1:6650"
//cms.Params.EtcdAddress = "127.0.0.1:2379"
cms.Params.Init()
cms.Params.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal)
cms.Params.KvRootPath = fmt.Sprintf("/%d/test/kv", randVal)
cms.Params.ProxyTimeTickChannel = fmt.Sprintf("proxyTimeTick%d", randVal)
@ -48,11 +59,14 @@ func TestGrpcService(t *testing.T) {
cms.Params.DefaultPartitionName = "_default"
cms.Params.DefaultIndexName = "_default"
t.Logf("master service port = %d", cms.Params.Port)
t.Logf("master service port = %d", Params.Port)
core := svr.core.(*cms.Core)
err = svr.startGrpc()
assert.Nil(t, err)
svr.core.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
err = svr.Init()
core := svr.core
err = core.Init()
assert.Nil(t, err)
core.ProxyTimeTickChan = make(chan typeutil.Timestamp, 8)
@ -126,10 +140,12 @@ func TestGrpcService(t *testing.T) {
return nil
}
err = svr.Start()
err = svr.start()
assert.Nil(t, err)
cli, err := NewGrpcClient(fmt.Sprintf("%s:%d", cms.Params.Address, cms.Params.Port), 3*time.Second)
svr.core.UpdateStateCode(internalpb2.StateCode_HEALTHY)
cli, err := grpcmasterserviceclient.NewClient(Params.Address, 3*time.Second)
assert.Nil(t, err)
err = cli.Init()
@ -178,6 +194,7 @@ func TestGrpcService(t *testing.T) {
status, err := cli.CreateCollection(req)
assert.Nil(t, err)
assert.Equal(t, len(createCollectionArray), 1)
assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_SUCCESS)
assert.Equal(t, createCollectionArray[0].Base.MsgType, commonpb.MsgType_kCreateCollection)

View File

@ -0,0 +1,83 @@
package grpcmasterservice
import (
"sync"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)
var Params ParamTable
var once sync.Once
type ParamTable struct {
paramtable.BaseTable
Address string // ip:port
Port int
ProxyServiceAddress string
IndexServiceAddress string
QueryServiceAddress string
DataServiceAddress string
}
func (p *ParamTable) Init() {
once.Do(func() {
p.BaseTable.Init()
err := p.LoadYaml("advanced/master.yaml")
if err != nil {
panic(err)
}
p.initAddress()
p.initPort()
p.initProxyServiceAddress()
p.initIndexServiceAddress()
p.initQueryServiceAddress()
p.initDataServiceAddress()
})
}
func (p *ParamTable) initAddress() {
ret, err := p.Load("_MasterAddress")
if err != nil {
panic(err)
}
p.Address = ret
}
func (p *ParamTable) initPort() {
p.Port = p.ParseInt("master.port")
}
func (p *ParamTable) initProxyServiceAddress() {
ret, err := p.Load("_PROXY_SERVICE_ADDRESS")
if err != nil {
panic(err)
}
p.ProxyServiceAddress = ret
}
func (p *ParamTable) initIndexServiceAddress() {
ret, err := p.Load("IndexServiceAddress")
if err != nil {
panic(err)
}
p.IndexServiceAddress = ret
}
func (p *ParamTable) initQueryServiceAddress() {
ret, err := p.Load("_QueryServiceAddress")
if err != nil {
panic(err)
}
p.QueryServiceAddress = ret
}
func (p *ParamTable) initDataServiceAddress() {
ret, err := p.Load("_DataServiceAddress")
if err != nil {
panic(err)
}
p.DataServiceAddress = ret
}

View File

@ -0,0 +1,29 @@
package grpcmasterservice
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestParamTable(t *testing.T) {
Params.Init()
assert.NotEqual(t, Params.Address, "")
t.Logf("master address = %s", Params.Address)
assert.NotEqual(t, Params.Port, 0)
t.Logf("master port = %d", Params.Port)
assert.NotEqual(t, Params.IndexServiceAddress, "")
t.Logf("IndexServiceAddress:%s", Params.IndexServiceAddress)
assert.NotEqual(t, Params.DataServiceAddress, "")
t.Logf("DataServiceAddress:%s", Params.DataServiceAddress)
assert.NotEqual(t, Params.QueryServiceAddress, "")
t.Logf("QueryServiceAddress:%s", Params.QueryServiceAddress)
assert.NotEqual(t, Params.ProxyServiceAddress, "")
t.Logf("ProxyServiceAddress:%s", Params.ProxyServiceAddress)
}

View File

@ -1,12 +1,20 @@
package masterservice
package grpcmasterservice
import (
"context"
"fmt"
"log"
"strconv"
"time"
"net"
"sync"
"github.com/zilliztech/milvus-distributed/internal/errors"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client"
isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client"
qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
cms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -17,161 +25,271 @@ import (
)
// grpc wrapper
type GrpcServer struct {
core cms.Interface
grpcServer *grpc.Server
grpcError error
grpcErrMux sync.Mutex
type Server struct {
core *cms.Core
grpcServer *grpc.Server
grpcErrChan chan error
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
proxyService *psc.Client
dataService *dsc.Client
indexService *isc.Client
queryService *qsc.Client
connectProxyService bool
connectDataService bool
connectIndexService bool
connectQueryService bool
}
func NewGrpcServer(ctx context.Context, factory msgstream.Factory) (*GrpcServer, error) {
s := &GrpcServer{}
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
ctx1, cancel := context.WithCancel(ctx)
s := &Server{
ctx: ctx1,
cancel: cancel,
grpcErrChan: make(chan error),
connectDataService: true,
connectProxyService: true,
connectIndexService: true,
connectQueryService: true,
}
var err error
s.ctx, s.cancel = context.WithCancel(ctx)
if s.core, err = cms.NewCore(s.ctx, factory); err != nil {
return nil, err
}
s.grpcServer = grpc.NewServer()
s.grpcError = nil
masterpb.RegisterMasterServiceServer(s.grpcServer, s)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", cms.Params.Port))
s.core, err = cms.NewCore(s.ctx, factory)
if err != nil {
return nil, err
}
go func() {
if err := s.grpcServer.Serve(lis); err != nil {
s.grpcErrMux.Lock()
defer s.grpcErrMux.Unlock()
s.grpcError = err
return s, err
}
func (s *Server) Run() error {
if err := s.init(); err != nil {
return err
}
if err := s.start(); err != nil {
return err
}
return nil
}
func (s *Server) init() error {
Params.Init()
log.Println("init params done")
err := s.startGrpc()
if err != nil {
return err
}
s.core.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
if s.connectProxyService {
log.Printf("proxy service address : %s", Params.ProxyServiceAddress)
proxyService := psc.NewClient(Params.ProxyServiceAddress)
if err := proxyService.Init(); err != nil {
panic(err)
}
}()
s.grpcErrMux.Lock()
err = s.grpcError
s.grpcErrMux.Unlock()
err := funcutil.WaitForComponentInitOrHealthy(proxyService, "ProxyService", 100, 200*time.Millisecond)
if err != nil {
panic(err)
}
if err != nil {
return nil, err
if err = s.core.SetProxyService(proxyService); err != nil {
panic(err)
}
}
return s, nil
if s.connectDataService {
log.Printf("data service address : %s", Params.DataServiceAddress)
dataService := dsc.NewClient(Params.DataServiceAddress)
if err := dataService.Init(); err != nil {
panic(err)
}
if err := dataService.Start(); err != nil {
panic(err)
}
err := funcutil.WaitForComponentInitOrHealthy(dataService, "DataService", 100, 200*time.Millisecond)
if err != nil {
panic(err)
}
if err = s.core.SetDataService(dataService); err != nil {
panic(err)
}
}
if s.connectIndexService {
log.Printf("index service address : %s", Params.IndexServiceAddress)
indexService := isc.NewClient(Params.IndexServiceAddress)
if err := indexService.Init(); err != nil {
panic(err)
}
if err := s.core.SetIndexService(indexService); err != nil {
panic(err)
}
}
if s.connectQueryService {
queryService, err := qsc.NewClient(Params.QueryServiceAddress, 5*time.Second)
if err != nil {
panic(err)
}
if err = queryService.Init(); err != nil {
panic(err)
}
if err = queryService.Start(); err != nil {
panic(err)
}
if err = s.core.SetQueryService(queryService); err != nil {
panic(err)
}
}
cms.Params.Init()
log.Println("grpc init done ...")
if err := s.core.Init(); err != nil {
return err
}
return nil
}
func (s *GrpcServer) Init() error {
return s.core.Init()
}
func (s *GrpcServer) Start() error {
return s.core.Start()
}
func (s *GrpcServer) Stop() error {
err := s.core.Stop()
s.cancel()
s.grpcServer.GracefulStop()
func (s *Server) startGrpc() error {
s.wg.Add(1)
go s.startGrpcLoop(Params.Port)
// wait for grpc server loop start
err := <-s.grpcErrChan
return err
}
func (s *GrpcServer) SetProxyService(p cms.ProxyServiceInterface) error {
c, ok := s.core.(*cms.Core)
if !ok {
return errors.Errorf("set proxy service failed")
func (s *Server) startGrpcLoop(grpcPort int) {
defer s.wg.Done()
log.Println("network port: ", grpcPort)
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
if err != nil {
log.Printf("GrpcServer:failed to listen: %v", err)
s.grpcErrChan <- err
return
}
return c.SetProxyService(p)
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
s.grpcServer = grpc.NewServer()
masterpb.RegisterMasterServiceServer(s.grpcServer, s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(lis); err != nil {
s.grpcErrChan <- err
}
}
func (s *GrpcServer) SetDataService(p cms.DataServiceInterface) error {
c, ok := s.core.(*cms.Core)
if !ok {
return errors.Errorf("set data service failed")
func (s *Server) start() error {
log.Println("Master Core start ...")
if err := s.core.Start(); err != nil {
return err
}
return c.SetDataService(p)
return nil
}
func (s *GrpcServer) SetIndexService(p cms.IndexServiceInterface) error {
c, ok := s.core.(*cms.Core)
if !ok {
return errors.Errorf("set index service failed")
func (s *Server) Stop() error {
if s.proxyService != nil {
_ = s.proxyService.Stop()
}
return c.SetIndexService(p)
if s.indexService != nil {
_ = s.indexService.Stop()
}
if s.dataService != nil {
_ = s.dataService.Stop()
}
if s.queryService != nil {
_ = s.queryService.Stop()
}
if s.core != nil {
return s.core.Stop()
}
s.cancel()
if s.grpcServer != nil {
s.grpcServer.GracefulStop()
}
s.wg.Wait()
return nil
}
func (s *GrpcServer) SetQueryService(q cms.QueryServiceInterface) error {
c, ok := s.core.(*cms.Core)
if !ok {
return errors.Errorf("set query service failed")
}
return c.SetQueryService(q)
}
func (s *GrpcServer) GetComponentStatesRPC(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
func (s *Server) GetComponentStatesRPC(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
return s.core.GetComponentStates()
}
//DDL request
func (s *GrpcServer) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
func (s *Server) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
return s.core.CreateCollection(in)
}
func (s *GrpcServer) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
func (s *Server) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
return s.core.DropCollection(in)
}
func (s *GrpcServer) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
func (s *Server) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
return s.core.HasCollection(in)
}
func (s *GrpcServer) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
func (s *Server) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
return s.core.DescribeCollection(in)
}
func (s *GrpcServer) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
func (s *Server) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
return s.core.ShowCollections(in)
}
func (s *GrpcServer) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
func (s *Server) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
return s.core.CreatePartition(in)
}
func (s *GrpcServer) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
func (s *Server) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
return s.core.DropPartition(in)
}
func (s *GrpcServer) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
func (s *Server) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
return s.core.HasPartition(in)
}
func (s *GrpcServer) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
func (s *Server) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
return s.core.ShowPartitions(in)
}
//index builder service
func (s *GrpcServer) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
func (s *Server) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
return s.core.CreateIndex(in)
}
func (s *GrpcServer) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
func (s *Server) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
return s.core.DropIndex(in)
}
func (s *GrpcServer) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
func (s *Server) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
return s.core.DescribeIndex(in)
}
//global timestamp allocator
func (s *GrpcServer) AllocTimestamp(ctx context.Context, in *masterpb.TsoRequest) (*masterpb.TsoResponse, error) {
func (s *Server) AllocTimestamp(ctx context.Context, in *masterpb.TsoRequest) (*masterpb.TsoResponse, error) {
return s.core.AllocTimestamp(in)
}
func (s *GrpcServer) AllocID(ctx context.Context, in *masterpb.IDRequest) (*masterpb.IDResponse, error) {
func (s *Server) AllocID(ctx context.Context, in *masterpb.IDRequest) (*masterpb.IDResponse, error) {
return s.core.AllocID(in)
}
//receiver time tick from proxy service, and put it into this channel
func (s *GrpcServer) GetTimeTickChannelRPC(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
func (s *Server) GetTimeTickChannelRPC(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
rsp, err := s.core.GetTimeTickChannel()
if err != nil {
return &milvuspb.StringResponse{
@ -192,7 +310,7 @@ func (s *GrpcServer) GetTimeTickChannelRPC(ctx context.Context, empty *commonpb.
}
//receive ddl from rpc and time tick from proxy service, and put them into this channel
func (s *GrpcServer) GetDdChannelRPC(ctx context.Context, in *commonpb.Empty) (*milvuspb.StringResponse, error) {
func (s *Server) GetDdChannelRPC(ctx context.Context, in *commonpb.Empty) (*milvuspb.StringResponse, error) {
rsp, err := s.core.GetDdChannel()
if err != nil {
return &milvuspb.StringResponse{
@ -213,7 +331,7 @@ func (s *GrpcServer) GetDdChannelRPC(ctx context.Context, in *commonpb.Empty) (*
}
//just define a channel, not used currently
func (s *GrpcServer) GetStatisticsChannelRPC(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
func (s *Server) GetStatisticsChannelRPC(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
rsp, err := s.core.GetStatisticsChannel()
if err != nil {
return &milvuspb.StringResponse{
@ -233,10 +351,10 @@ func (s *GrpcServer) GetStatisticsChannelRPC(ctx context.Context, empty *commonp
}, nil
}
func (s *GrpcServer) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
func (s *Server) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
return s.core.DescribeSegment(in)
}
func (s *GrpcServer) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) {
func (s *Server) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) {
return s.core.ShowSegments(in)
}

View File

@ -6,6 +6,7 @@ import (
"strconv"
"sync"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)
@ -36,12 +37,10 @@ func (pt *ParamTable) Init() {
})
}
// todo
func (pt *ParamTable) LoadFromArgs() {
}
//todo
func (pt *ParamTable) LoadFromEnv() {
masterAddress := os.Getenv("MASTER_ADDRESS")
@ -69,6 +68,12 @@ func (pt *ParamTable) LoadFromEnv() {
pt.DataServiceAddress = dataServiceAddress
}
Params.IP = funcutil.GetLocalIP()
host := os.Getenv("PROXY_NODE_HOST")
if len(host) > 0 {
Params.IP = host
}
}
func (pt *ParamTable) initParams() {

View File

@ -6,18 +6,20 @@ import (
"io"
"log"
"net"
"os"
"strconv"
"sync"
"time"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
grpcdataservice "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
"google.golang.org/grpc"
grpcdataserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client"
grpcindexserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
grcpmasterservice "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
grpcmasterserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client"
grpcproxyserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client"
grpcqueryserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@ -25,7 +27,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/proxynode"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"google.golang.org/grpc"
)
type Server struct {
@ -36,15 +37,9 @@ type Server struct {
grpcErrChan chan error
ip string
port int
//todo
proxyServiceClient *grpcproxyserviceclient.Client
// todo InitParams Service addrs
masterServiceClient *grcpmasterservice.GrpcClient
dataServiceClient *grpcdataservice.Client
proxyServiceClient *grpcproxyserviceclient.Client
masterServiceClient *grpcmasterserviceclient.GrpcClient
dataServiceClient *grpcdataserviceclient.Client
queryServiceClient *grpcqueryserviceclient.Client
indexServiceClient *grpcindexserviceclient.Client
@ -87,7 +82,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
log.Println("network port: ", grpcPort)
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
if err != nil {
log.Printf("GrpcServer:failed to listen: %v", err)
log.Printf("Server:failed to listen: %v", err)
s.grpcErrChan <- err
return
}
@ -124,12 +119,6 @@ func (s *Server) init() error {
var err error
Params.Init()
Params.IP = funcutil.GetLocalIP()
host := os.Getenv("PROXY_NODE_HOST")
if len(host) > 0 {
Params.IP = host
}
Params.LoadFromEnv()
Params.LoadFromArgs()
@ -169,7 +158,7 @@ func (s *Server) init() error {
masterServiceAddr := Params.MasterAddress
log.Println("master address: ", masterServiceAddr)
timeout := 3 * time.Second
s.masterServiceClient, err = grcpmasterservice.NewGrpcClient(masterServiceAddr, timeout)
s.masterServiceClient, err = grpcmasterserviceclient.NewClient(masterServiceAddr, timeout)
if err != nil {
return err
}
@ -182,7 +171,7 @@ func (s *Server) init() error {
dataServiceAddr := Params.DataServiceAddress
log.Println("data service address ...", dataServiceAddr)
s.dataServiceClient = grpcdataservice.NewClient(dataServiceAddr)
s.dataServiceClient = grpcdataserviceclient.NewClient(dataServiceAddr)
err = s.dataServiceClient.Init()
if err != nil {
return err

View File

@ -62,7 +62,7 @@ func NewServer(ctx1 context.Context, factory msgstream.Factory) (*Server, error)
if err != nil {
return nil, err
}
return server, err
return server, nil
}
func (s *Server) Run() error {
@ -133,6 +133,7 @@ func (s *Server) start() error {
}
func (s *Server) Stop() error {
s.cancel()
s.closer.Close()
err := s.impl.Stop()
if err != nil {
@ -141,7 +142,6 @@ func (s *Server) Stop() error {
if s.grpcServer != nil {
s.grpcServer.GracefulStop()
}
s.cancel()
s.wg.Wait()
return nil
}

View File

@ -0,0 +1,95 @@
package grpcquerynode
import (
"os"
"strconv"
"sync"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)
var Params ParamTable
var once sync.Once
type ParamTable struct {
paramtable.BaseTable
QueryNodeIP string
QueryNodePort int
QueryNodeID UniqueID
IndexServiceAddress string
MasterAddress string
DataServiceAddress string
QueryServiceAddress string
}
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initMasterAddress()
pt.initIndexServiceAddress()
pt.initDataServiceAddress()
pt.initQueryServiceAddress()
})
}
func (pt *ParamTable) LoadFromArgs() {
}
func (pt *ParamTable) LoadFromEnv() {
// todo assign by queryservice and set by read initparms
queryNodeIDStr := os.Getenv("QUERY_NODE_ID")
if queryNodeIDStr == "" {
panic("Can't Get QUERY_NODE_ID")
}
queryID, err := strconv.Atoi(queryNodeIDStr)
if err != nil {
panic(err)
}
pt.QueryNodeID = UniqueID(queryID)
Params.QueryNodeIP = funcutil.GetLocalIP()
host := os.Getenv("QUERY_NODE_HOST")
if len(host) > 0 {
Params.QueryNodeIP = host
}
}
func (pt *ParamTable) initMasterAddress() {
ret, err := pt.Load("_MasterAddress")
if err != nil {
panic(err)
}
pt.MasterAddress = ret
}
func (pt *ParamTable) initIndexServiceAddress() {
ret, err := pt.Load("IndexServiceAddress")
if err != nil {
panic(err)
}
pt.IndexServiceAddress = ret
}
func (pt *ParamTable) initDataServiceAddress() {
ret, err := pt.Load("_DataServiceAddress")
if err != nil {
panic(err)
}
pt.DataServiceAddress = ret
}
func (pt *ParamTable) initQueryServiceAddress() {
ret, err := pt.Load("_QueryServiceAddress")
if err != nil {
panic(err)
}
pt.QueryServiceAddress = ret
}

View File

@ -0,0 +1,23 @@
package grpcquerynode
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestParamTable(t *testing.T) {
Params.Init()
assert.NotEqual(t, Params.IndexServiceAddress, "")
t.Logf("IndexServiceAddress:%s", Params.IndexServiceAddress)
assert.NotEqual(t, Params.DataServiceAddress, "")
t.Logf("DataServiceAddress:%s", Params.DataServiceAddress)
assert.NotEqual(t, Params.MasterAddress, "")
t.Logf("MasterAddress:%s", Params.MasterAddress)
assert.NotEqual(t, Params.QueryServiceAddress, "")
t.Logf("QueryServiceAddress:%s", Params.QueryServiceAddress)
}

View File

@ -2,10 +2,18 @@ package grpcquerynode
import (
"context"
"fmt"
"log"
"net"
"strconv"
"sync"
"time"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client"
isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client"
qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"google.golang.org/grpc"
@ -14,86 +22,240 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
qn "github.com/zilliztech/milvus-distributed/internal/querynode"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type UniqueID = typeutil.UniqueID
type Server struct {
node *qn.QueryNode
impl *qn.QueryNode
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
grpcErrChan chan error
grpcServer *grpc.Server
grpcError error
grpcErrMux sync.Mutex
ctx context.Context
cancel context.CancelFunc
dataService *dsc.Client
masterService *msc.GrpcClient
indexService *isc.Client
queryService *qsc.Client
}
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
ctx1, cancel := context.WithCancel(ctx)
s := &Server{
ctx: ctx,
node: qn.NewQueryNodeWithoutID(ctx, factory),
}
qn.Params.Init()
s.grpcServer = grpc.NewServer()
querypb.RegisterQueryNodeServer(s.grpcServer, s)
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", qn.Params.QueryNodeIP, qn.Params.QueryNodePort))
if err != nil {
return nil, err
}
go func() {
log.Println("start query node grpc server...")
if err = s.grpcServer.Serve(lis); err != nil {
s.grpcErrMux.Lock()
defer s.grpcErrMux.Unlock()
s.grpcError = err
}
}()
s.grpcErrMux.Lock()
err = s.grpcError
s.grpcErrMux.Unlock()
if err != nil {
return nil, err
ctx: ctx1,
cancel: cancel,
impl: qn.NewQueryNodeWithoutID(ctx, factory),
grpcErrChan: make(chan error),
}
return s, nil
}
func (s *Server) Init() error {
return s.node.Init()
func (s *Server) init() error {
Params.Init()
Params.QueryNodePort = funcutil.GetAvailablePort()
Params.LoadFromEnv()
Params.LoadFromArgs()
log.Println("QueryNode, port:", Params.QueryNodePort)
s.wg.Add(1)
go s.startGrpcLoop(Params.QueryNodePort)
// wait for grpc server loop start
err := <-s.grpcErrChan
if err != nil {
return err
}
// --- QueryService ---
log.Println("QueryService address:", Params.QueryServiceAddress)
log.Println("Init Query service client ...")
queryService, err := qsc.NewClient(Params.QueryServiceAddress, 20*time.Second)
if err != nil {
panic(err)
}
if err = queryService.Init(); err != nil {
panic(err)
}
if err = queryService.Start(); err != nil {
panic(err)
}
err = funcutil.WaitForComponentInitOrHealthy(queryService, "QueryService", 100, time.Millisecond*200)
if err != nil {
panic(err)
}
if err := s.SetQueryService(queryService); err != nil {
panic(err)
}
// --- Master Server Client ---
//ms.Params.Init()
addr := Params.MasterAddress
log.Println("Master service address:", addr)
log.Println("Init master service client ...")
masterService, err := msc.NewClient(addr, 20*time.Second)
if err != nil {
panic(err)
}
if err = masterService.Init(); err != nil {
panic(err)
}
if err = masterService.Start(); err != nil {
panic(err)
}
err = funcutil.WaitForComponentHealthy(masterService, "MasterService", 100, time.Millisecond*200)
if err != nil {
panic(err)
}
if err := s.SetMasterService(masterService); err != nil {
panic(err)
}
// --- IndexService ---
log.Println("Index service address:", Params.IndexServiceAddress)
indexService := isc.NewClient(Params.IndexServiceAddress)
if err := indexService.Init(); err != nil {
panic(err)
}
if err := indexService.Start(); err != nil {
panic(err)
}
// wait indexservice healthy
err = funcutil.WaitForComponentHealthy(indexService, "IndexService", 100, time.Millisecond*200)
if err != nil {
panic(err)
}
if err := s.SetIndexService(indexService); err != nil {
panic(err)
}
// --- DataService ---
log.Printf("Data service address: %s", Params.DataServiceAddress)
log.Println("Querynode Init data service client ...")
dataService := dsc.NewClient(Params.DataServiceAddress)
if err = dataService.Init(); err != nil {
panic(err)
}
if err = dataService.Start(); err != nil {
panic(err)
}
err = funcutil.WaitForComponentInitOrHealthy(dataService, "DataService", 100, time.Millisecond*200)
if err != nil {
panic(err)
}
if err := s.SetDataService(dataService); err != nil {
panic(err)
}
qn.Params.Init()
qn.Params.QueryNodeIP = Params.QueryNodeIP
qn.Params.QueryNodePort = int64(Params.QueryNodePort)
qn.Params.QueryNodeID = Params.QueryNodeID
s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
if err := s.impl.Init(); err != nil {
log.Println("impl init error: ", err)
return err
}
return nil
}
func (s *Server) Start() error {
return s.node.Start()
func (s *Server) start() error {
return s.impl.Start()
}
func (s *Server) startGrpcLoop(grpcPort int) {
defer s.wg.Done()
addr := ":" + strconv.Itoa(grpcPort)
lis, err := net.Listen("tcp", addr)
if err != nil {
log.Printf("QueryNode GrpcServer:failed to listen: %v", err)
s.grpcErrChan <- err
return
}
log.Println("QueryNode:: addr:", addr)
s.grpcServer = grpc.NewServer()
querypb.RegisterQueryNodeServer(s.grpcServer, s)
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(lis); err != nil {
log.Println("QueryNode Start Grpc Failed!!!!")
s.grpcErrChan <- err
}
}
func (s *Server) Run() error {
if err := s.init(); err != nil {
return err
}
log.Println("querynode init done ...")
if err := s.start(); err != nil {
return err
}
log.Println("querynode start done ...")
return nil
}
func (s *Server) Stop() error {
err := s.node.Stop()
s.cancel()
s.grpcServer.GracefulStop()
return err
var err error
if s.grpcServer != nil {
s.grpcServer.GracefulStop()
}
err = s.impl.Stop()
if err != nil {
return err
}
s.wg.Wait()
return nil
}
func (s *Server) SetMasterService(master qn.MasterServiceInterface) error {
return s.node.SetMasterService(master)
return s.impl.SetMasterService(master)
}
func (s *Server) SetQueryService(query qn.QueryServiceInterface) error {
return s.node.SetQueryService(query)
return s.impl.SetQueryService(query)
}
func (s *Server) SetIndexService(index qn.IndexServiceInterface) error {
return s.node.SetIndexService(index)
return s.impl.SetIndexService(index)
}
func (s *Server) SetDataService(data qn.DataServiceInterface) error {
return s.node.SetDataService(data)
return s.impl.SetDataService(data)
}
func (s *Server) GetTimeTickChannel(ctx context.Context, in *commonpb.Empty) (*milvuspb.StringResponse, error) {
// ignore ctx and in
channel, err := s.node.GetTimeTickChannel()
channel, err := s.impl.GetTimeTickChannel()
if err != nil {
return nil, err
}
@ -107,7 +269,7 @@ func (s *Server) GetTimeTickChannel(ctx context.Context, in *commonpb.Empty) (*m
func (s *Server) GetStatsChannel(ctx context.Context, in *commonpb.Empty) (*milvuspb.StringResponse, error) {
// ignore ctx and in
channel, err := s.node.GetStatisticsChannel()
channel, err := s.impl.GetStatisticsChannel()
if err != nil {
return nil, err
}
@ -121,7 +283,7 @@ func (s *Server) GetStatsChannel(ctx context.Context, in *commonpb.Empty) (*milv
func (s *Server) GetComponentStates(ctx context.Context, in *commonpb.Empty) (*querypb.ComponentStatesResponse, error) {
// ignore ctx and in
componentStates, err := s.node.GetComponentStates()
componentStates, err := s.impl.GetComponentStates()
if err != nil {
return nil, err
}
@ -135,29 +297,29 @@ func (s *Server) GetComponentStates(ctx context.Context, in *commonpb.Empty) (*q
func (s *Server) AddQueryChannel(ctx context.Context, in *querypb.AddQueryChannelsRequest) (*commonpb.Status, error) {
// ignore ctx
return s.node.AddQueryChannel(in)
return s.impl.AddQueryChannel(in)
}
func (s *Server) RemoveQueryChannel(ctx context.Context, in *querypb.RemoveQueryChannelsRequest) (*commonpb.Status, error) {
// ignore ctx
return s.node.RemoveQueryChannel(in)
return s.impl.RemoveQueryChannel(in)
}
func (s *Server) WatchDmChannels(ctx context.Context, in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) {
// ignore ctx
return s.node.WatchDmChannels(in)
return s.impl.WatchDmChannels(in)
}
func (s *Server) LoadSegments(ctx context.Context, in *querypb.LoadSegmentRequest) (*commonpb.Status, error) {
// ignore ctx
return s.node.LoadSegments(in)
return s.impl.LoadSegments(in)
}
func (s *Server) ReleaseSegments(ctx context.Context, in *querypb.ReleaseSegmentRequest) (*commonpb.Status, error) {
// ignore ctx
return s.node.ReleaseSegments(in)
return s.impl.ReleaseSegments(in)
}
func (s *Server) GetSegmentInfo(ctx context.Context, in *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) {
return s.node.GetSegmentInfo(in)
return s.impl.GetSegmentInfo(in)
}

View File

@ -1,88 +0,0 @@
package grpcquerynode
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"testing"
"time"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/querynode"
)
const (
debug = true
ctxTimeInMillisecond = 2000
)
func TestQueryNodeDistributed_Service(t *testing.T) {
// Creates server.
var ctx context.Context
var cancel context.CancelFunc
if debug {
ctx, cancel = context.WithCancel(context.Background())
} else {
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel = context.WithDeadline(context.Background(), d)
}
go mockMain(ctx)
<-ctx.Done()
cancel()
}
func mockMain(ctx context.Context) {
svr := newServerMock(ctx)
if err := svr.Init(); err != nil {
panic(err)
}
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
var sig os.Signal
if err := svr.Start(); err != nil {
panic(err)
}
defer svr.Stop()
<-ctx.Done()
log.Print("Got signal to exit", zap.String("signal", sig.String()))
switch sig {
case syscall.SIGTERM:
os.Exit(0)
default:
os.Exit(1)
}
}
func newServerMock(ctx context.Context) *Server {
factory := pulsarms.NewFactory()
server := &Server{
node: querynode.NewQueryNodeWithoutID(ctx, factory),
}
if err := server.node.SetQueryService(&queryServiceMock{}); err != nil {
panic(err)
}
if err := server.node.SetMasterService(&MasterServiceMock{}); err != nil {
panic(err)
}
if err := server.node.SetIndexService(&IndexServiceMock{}); err != nil {
panic(err)
}
if err := server.node.SetDataService(&DataServiceMock{}); err != nil {
panic(err)
}
return server
}

View File

@ -36,7 +36,6 @@ func NewClient(address string, timeout time.Duration) (*Client, error) {
func (c *Client) Init() error {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
var err error
for i := 0; i < c.retry; i++ {
if c.conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock()); err == nil {

View File

@ -0,0 +1,58 @@
package grpcqueryservice
import (
"sync"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)
var Params ParamTable
var once sync.Once
type ParamTable struct {
paramtable.BaseTable
Port int
IndexServiceAddress string
MasterAddress string
DataServiceAddress string
}
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initPort()
pt.initMasterAddress()
pt.initIndexServiceAddress()
pt.initDataServiceAddress()
})
}
func (pt *ParamTable) initMasterAddress() {
ret, err := pt.Load("_MasterAddress")
if err != nil {
panic(err)
}
pt.MasterAddress = ret
}
func (pt *ParamTable) initIndexServiceAddress() {
ret, err := pt.Load("IndexServiceAddress")
if err != nil {
panic(err)
}
pt.IndexServiceAddress = ret
}
func (pt *ParamTable) initDataServiceAddress() {
ret, err := pt.Load("_DataServiceAddress")
if err != nil {
panic(err)
}
pt.DataServiceAddress = ret
}
func (pt *ParamTable) initPort() {
pt.Port = pt.ParseInt("queryService.port")
}

View File

@ -0,0 +1,21 @@
package grpcqueryservice
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestParamTable(t *testing.T) {
Params.Init()
assert.NotEqual(t, Params.IndexServiceAddress, "")
t.Logf("IndexServiceAddress:%s", Params.IndexServiceAddress)
assert.NotEqual(t, Params.DataServiceAddress, "")
t.Logf("DataServiceAddress:%s", Params.DataServiceAddress)
assert.NotEqual(t, Params.MasterAddress, "")
t.Logf("MasterAddress:%s", Params.MasterAddress)
}

View File

@ -6,6 +6,11 @@ import (
"net"
"strconv"
"sync"
"time"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"google.golang.org/grpc"
@ -18,75 +23,146 @@ import (
)
type Server struct {
grpcServer *grpc.Server
grpcError error
grpcErrMux sync.Mutex
wg sync.WaitGroup
loopCtx context.Context
loopCancel context.CancelFunc
grpcServer *grpc.Server
queryService *qs.QueryService
grpcErrChan chan error
impl *qs.QueryService
msFactory msgstream.Factory
dataService *dsc.Client
masterService *msc.GrpcClient
}
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
ctx1, cancel := context.WithCancel(ctx)
service, err := qs.NewQueryService(ctx1, factory)
svr, err := qs.NewQueryService(ctx1, factory)
if err != nil {
cancel()
return nil, err
}
return &Server{
queryService: service,
loopCtx: ctx1,
loopCancel: cancel,
msFactory: factory,
impl: svr,
loopCtx: ctx1,
loopCancel: cancel,
msFactory: factory,
grpcErrChan: make(chan error),
}, nil
}
func (s *Server) Init() error {
log.Println("query service init")
if err := s.queryService.Init(); err != nil {
func (s *Server) Run() error {
if err := s.init(); err != nil {
return err
}
log.Println("queryservice init done ...")
if err := s.start(); err != nil {
return err
}
return nil
}
func (s *Server) Start() error {
log.Println("start query service ...")
func (s *Server) init() error {
Params.Init()
s.wg.Add(1)
go s.startGrpcLoop(Params.Port)
// wait for grpc server loop start
if err := <-s.grpcErrChan; err != nil {
return err
}
// --- Master Server Client ---
log.Println("Master service address:", Params.MasterAddress)
log.Println("Init master service client ...")
masterService, err := msc.NewClient(Params.MasterAddress, 20*time.Second)
if err != nil {
panic(err)
}
if err = masterService.Init(); err != nil {
panic(err)
}
if err = masterService.Start(); err != nil {
panic(err)
}
// wait for master init or healthy
err = funcutil.WaitForComponentInitOrHealthy(masterService, "MasterService", 100, time.Millisecond*200)
if err != nil {
panic(err)
}
if err := s.SetMasterService(masterService); err != nil {
panic(err)
}
// --- Data service client ---
log.Println("DataService Address:", Params.DataServiceAddress)
log.Println("QueryService Init data service client ...")
dataService := dsc.NewClient(Params.DataServiceAddress)
if err = dataService.Init(); err != nil {
panic(err)
}
if err = dataService.Start(); err != nil {
panic(err)
}
err = funcutil.WaitForComponentInitOrHealthy(dataService, "DataService", 100, time.Millisecond*200)
if err != nil {
panic(err)
}
if err := s.SetDataService(dataService); err != nil {
panic(err)
}
qs.Params.Init()
s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
if err := s.impl.Init(); err != nil {
return err
}
return nil
}
func (s *Server) startGrpcLoop(grpcPort int) {
defer s.wg.Done()
log.Println("network port: ", grpcPort)
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
if err != nil {
log.Printf("GrpcServer:failed to listen: %v", err)
s.grpcErrChan <- err
return
}
ctx, cancel := context.WithCancel(s.loopCtx)
defer cancel()
s.grpcServer = grpc.NewServer()
querypb.RegisterQueryServiceServer(s.grpcServer, s)
log.Println("Starting start query service Server")
lis, err := net.Listen("tcp", ":"+strconv.Itoa(qs.Params.Port))
if err != nil {
return err
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(lis); err != nil {
s.grpcErrChan <- err
}
}
go func() {
if err := s.grpcServer.Serve(lis); err != nil {
s.grpcErrMux.Lock()
defer s.grpcErrMux.Unlock()
s.grpcError = err
}
}()
s.grpcErrMux.Lock()
err = s.grpcError
s.grpcErrMux.Unlock()
if err != nil {
return err
}
s.queryService.Start()
return nil
func (s *Server) start() error {
return s.impl.Start()
}
func (s *Server) Stop() error {
err := s.queryService.Stop()
err := s.impl.Stop()
s.loopCancel()
if s.grpcServer != nil {
s.grpcServer.GracefulStop()
@ -95,7 +171,7 @@ func (s *Server) Stop() error {
}
func (s *Server) GetComponentStates(ctx context.Context, req *commonpb.Empty) (*internalpb2.ComponentStates, error) {
componentStates, err := s.queryService.GetComponentStates()
componentStates, err := s.impl.GetComponentStates()
if err != nil {
return &internalpb2.ComponentStates{
Status: &commonpb.Status{
@ -109,7 +185,7 @@ func (s *Server) GetComponentStates(ctx context.Context, req *commonpb.Empty) (*
}
func (s *Server) GetTimeTickChannel(ctx context.Context, req *commonpb.Empty) (*milvuspb.StringResponse, error) {
channel, err := s.queryService.GetTimeTickChannel()
channel, err := s.impl.GetTimeTickChannel()
if err != nil {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
@ -129,7 +205,7 @@ func (s *Server) GetTimeTickChannel(ctx context.Context, req *commonpb.Empty) (*
}
func (s *Server) GetStatisticsChannel(ctx context.Context, req *commonpb.Empty) (*milvuspb.StringResponse, error) {
statisticsChannel, err := s.queryService.GetStatisticsChannel()
statisticsChannel, err := s.impl.GetStatisticsChannel()
if err != nil {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
@ -149,51 +225,51 @@ func (s *Server) GetStatisticsChannel(ctx context.Context, req *commonpb.Empty)
}
func (s *Server) SetMasterService(m qs.MasterServiceInterface) error {
s.queryService.SetMasterService(m)
s.impl.SetMasterService(m)
return nil
}
func (s *Server) SetDataService(d qs.DataServiceInterface) error {
s.queryService.SetDataService(d)
s.impl.SetDataService(d)
return nil
}
func (s *Server) RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
return s.queryService.RegisterNode(req)
return s.impl.RegisterNode(req)
}
func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectionRequest) (*querypb.ShowCollectionResponse, error) {
return s.queryService.ShowCollections(req)
return s.impl.ShowCollections(req)
}
func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
return s.queryService.LoadCollection(req)
return s.impl.LoadCollection(req)
}
func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
return s.queryService.ReleaseCollection(req)
return s.impl.ReleaseCollection(req)
}
func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionRequest) (*querypb.ShowPartitionResponse, error) {
return s.queryService.ShowPartitions(req)
return s.impl.ShowPartitions(req)
}
func (s *Server) GetPartitionStates(ctx context.Context, req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error) {
return s.queryService.GetPartitionStates(req)
return s.impl.GetPartitionStates(req)
}
func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionRequest) (*commonpb.Status, error) {
return s.queryService.LoadPartitions(req)
return s.impl.LoadPartitions(req)
}
func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionRequest) (*commonpb.Status, error) {
return s.queryService.ReleasePartitions(req)
return s.impl.ReleasePartitions(req)
}
func (s *Server) CreateQueryChannel(ctx context.Context, req *commonpb.Empty) (*querypb.CreateQueryChannelResponse, error) {
return s.queryService.CreateQueryChannel()
return s.impl.CreateQueryChannel()
}
func (s *Server) GetSegmentInfo(ctx context.Context, req *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) {
return s.queryService.GetSegmentInfo(req)
return s.impl.GetSegmentInfo(req)
}

View File

@ -58,7 +58,7 @@ func NewNodeImpl(ctx context.Context) (*NodeImpl, error) {
func (i *NodeImpl) Init() error {
log.Println("AAAAAAAAAAAAAAAAA", i.serviceClient)
err := funcutil.WaitForComponentReady(i.serviceClient, "IndexService", 10, time.Second)
err := funcutil.WaitForComponentHealthy(i.serviceClient, "IndexService", 10, time.Second)
log.Println("BBBBBBBBB", i.serviceClient)
if err != nil {

View File

@ -93,11 +93,6 @@ type Interface interface {
//segment
DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error)
ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error)
//get system config from master, not used currently
//GetSysConfigs(in *milvuspb.SysConfigRequest)
//GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error)
}
// ------------------ struct -----------------------
@ -182,7 +177,7 @@ type Core struct {
//call once
initOnce sync.Once
startOnce sync.Once
isInit atomic.Value
//isInit atomic.Value
msFactory ms.Factory
}
@ -192,17 +187,19 @@ type Core struct {
func NewCore(c context.Context, factory ms.Factory) (*Core, error) {
ctx, cancel := context.WithCancel(c)
rand.Seed(time.Now().UnixNano())
Params.Init()
core := &Core{
ctx: ctx,
cancel: cancel,
msFactory: factory,
}
core.stateCode.Store(internalpb2.StateCode_INITIALIZING)
core.isInit.Store(false)
core.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
return core, nil
}
func (c *Core) UpdateStateCode(code internalpb2.StateCode) {
c.stateCode.Store(code)
}
func (c *Core) checkInit() error {
if c.MetaTable == nil {
return errors.Errorf("MetaTable is nil")
@ -788,7 +785,6 @@ func (c *Core) Init() error {
c.ddReqQueue = make(chan reqTask, 1024)
c.indexTaskQueue = make(chan *CreateIndexTask, 1024)
initError = c.setMsgStreams()
c.isInit.Store(true)
})
if initError == nil {
log.Printf("Master service State Code = %s", internalpb2.StateCode_name[int32(internalpb2.StateCode_INITIALIZING)])
@ -797,10 +793,6 @@ func (c *Core) Init() error {
}
func (c *Core) Start() error {
isInit := c.isInit.Load().(bool)
if !isInit {
return errors.Errorf("call init before start")
}
if err := c.checkInit(); err != nil {
return err
}

View File

@ -162,6 +162,7 @@ func TestMasterService(t *testing.T) {
defer cancel()
msFactory := pulsarms.NewFactory()
Params.Init()
core, err := NewCore(ctx, msFactory)
assert.Nil(t, err)
randVal := rand.Int()

View File

@ -12,9 +12,7 @@ var once sync.Once
type ParamTable struct {
paramtable.BaseTable
Address string
Port int
NodeID uint64
NodeID uint64
PulsarAddress string
EtcdAddress string
@ -43,8 +41,6 @@ func (p *ParamTable) Init() {
panic(err)
}
p.initAddress()
p.initPort()
p.initNodeID()
p.initPulsarAddress()
@ -65,18 +61,6 @@ func (p *ParamTable) Init() {
})
}
func (p *ParamTable) initAddress() {
masterAddress, err := p.Load("master.address")
if err != nil {
panic(err)
}
p.Address = masterAddress
}
func (p *ParamTable) initPort() {
p.Port = p.ParseInt("master.port")
}
func (p *ParamTable) initNodeID() {
p.NodeID = uint64(p.ParseInt64("master.nodeID"))
}

View File

@ -9,12 +9,6 @@ import (
func TestParamTable(t *testing.T) {
Params.Init()
assert.NotEqual(t, Params.Address, "")
t.Logf("master address = %s", Params.Address)
assert.NotEqual(t, Params.Port, 0)
t.Logf("master port = %d", Params.Port)
assert.NotEqual(t, Params.NodeID, 0)
t.Logf("master node ID = %d", Params.NodeID)

View File

@ -131,12 +131,11 @@ func (s *ServiceImpl) Init() error {
s.tick = newTimeTick(s.ctx, ttBarrier, serviceTimeTickMsgStream, insertTickMsgStream)
log.Println("create time tick ...")
s.stateCode = internalpb2.StateCode_HEALTHY
return nil
}
func (s *ServiceImpl) Start() error {
s.stateCode = internalpb2.StateCode_HEALTHY
s.sched.Start()
log.Println("start scheduler ...")
return s.tick.Start()

View File

@ -5,8 +5,6 @@ import (
"math/rand"
"time"
"github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@ -21,8 +19,7 @@ type ServiceImpl struct {
//subStates *internalpb2.ComponentStates
dataServiceClient *dataservice.Client
nodeStartParams []*commonpb.KeyValuePair
nodeStartParams []*commonpb.KeyValuePair
ctx context.Context
cancel context.CancelFunc

View File

@ -6,7 +6,6 @@ import (
"strconv"
"sync"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)
@ -17,7 +16,6 @@ type ParamTable struct {
ETCDAddress string
MetaRootPath string
WriteNodeSegKvSubPath string
IndexBuilderAddress string
QueryNodeIP string
QueryNodePort int64
@ -75,11 +73,6 @@ func (p *ParamTable) Init() {
panic(err)
}
err = p.LoadYaml("milvus.yaml")
if err != nil {
panic(err)
}
queryNodeIDStr := os.Getenv("QUERY_NODE_ID")
if queryNodeIDStr == "" {
queryNodeIDList := p.QueryNodeIDList()
@ -90,18 +83,6 @@ func (p *ParamTable) Init() {
}
}
queryNodeIP := os.Getenv("QUERY_NODE_IP")
if queryNodeIP == "" {
p.QueryNodeIP = "localhost"
} else {
p.QueryNodeIP = queryNodeIP
}
p.QueryNodePort = int64(funcutil.GetAvailablePort())
err = p.LoadYaml("advanced/common.yaml")
if err != nil {
panic(err)
}
err = p.Save("_queryNodeID", queryNodeIDStr)
if err != nil {
panic(err)
@ -122,7 +103,6 @@ func (p *ParamTable) Init() {
p.initETCDAddress()
p.initMetaRootPath()
p.initWriteNodeSegKvSubPath()
p.initIndexBuilderAddress()
p.initGracefulTime()
p.initMsgChannelSubName()
@ -234,14 +214,6 @@ func (p *ParamTable) initPulsarAddress() {
p.PulsarAddress = url
}
func (p *ParamTable) initIndexBuilderAddress() {
ret, err := p.Load("_IndexBuilderAddress")
if err != nil {
panic(err)
}
p.IndexBuilderAddress = ret
}
func (p *ParamTable) initInsertChannelRange() {
insertChannelRange, err := p.Load("msgChannel.channelRange.insert")
if err != nil {

View File

@ -48,7 +48,7 @@ type QueryNode struct {
queryNodeLoopCtx context.Context
queryNodeLoopCancel context.CancelFunc
QueryNodeID uint64
QueryNodeID UniqueID
stateCode atomic.Value
replica collectionReplica
@ -72,7 +72,7 @@ type QueryNode struct {
msFactory msgstream.Factory
}
func NewQueryNode(ctx context.Context, queryNodeID uint64, factory msgstream.Factory) *QueryNode {
func NewQueryNode(ctx context.Context, queryNodeID UniqueID, factory msgstream.Factory) *QueryNode {
ctx1, cancel := context.WithCancel(ctx)
node := &QueryNode{
queryNodeLoopCtx: ctx1,
@ -88,7 +88,7 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64, factory msgstream.Fac
}
node.replica = newCollectionReplicaImpl()
node.stateCode.Store(internalpb2.StateCode_INITIALIZING)
node.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
return node
}
@ -107,13 +107,9 @@ func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *Quer
}
node.replica = newCollectionReplicaImpl()
node.stateCode.Store(internalpb2.StateCode_INITIALIZING)
return node
}
node.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
// TODO: delete this and call node.Init()
func Init() {
Params.Init()
return node
}
func (node *QueryNode) Init() error {
@ -193,14 +189,12 @@ func (node *QueryNode) Start() error {
//go node.metaService.start()
go node.loadService.start()
go node.statsService.start()
node.stateCode.Store(internalpb2.StateCode_HEALTHY)
<-node.queryNodeLoopCtx.Done()
node.UpdateStateCode(internalpb2.StateCode_HEALTHY)
return nil
}
func (node *QueryNode) Stop() error {
node.stateCode.Store(internalpb2.StateCode_ABNORMAL)
node.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
node.queryNodeLoopCancel()
// free collectionReplica
@ -225,6 +219,10 @@ func (node *QueryNode) Stop() error {
return nil
}
func (node *QueryNode) UpdateStateCode(code internalpb2.StateCode) {
node.stateCode.Store(code)
}
func (node *QueryNode) SetMasterService(master MasterServiceInterface) error {
if master == nil {
return errors.New("null master service interface")

View File

@ -26,12 +26,15 @@ const defaultPartitionID = UniqueID(2021)
type queryServiceMock struct{}
func setup() {
os.Setenv("QUERY_NODE_ID", "1")
Params.Init()
//Params.QueryNodeID = 1
Params.initQueryTimeTickChannelName()
Params.initSearchResultChannelNames()
Params.initStatsChannelName()
Params.initSearchChannelNames()
Params.MetaRootPath = "/etcd/test/root/querynode"
}
func genTestCollectionMeta(collectionID UniqueID, isBinary bool) *etcdpb.CollectionInfo {
@ -160,7 +163,7 @@ func newQueryNodeMock() *QueryNode {
}
msFactory := pulsarms.NewFactory()
svr := NewQueryNode(ctx, 0, msFactory)
svr := NewQueryNode(ctx, Params.QueryNodeID, msFactory)
err := svr.SetQueryService(&queryServiceMock{})
if err != nil {
panic(err)
@ -208,5 +211,6 @@ func TestMain(m *testing.M) {
func TestQueryNode_Start(t *testing.T) {
localNode := newQueryNodeMock()
localNode.Start()
<-localNode.queryNodeLoopCtx.Done()
localNode.Stop()
}

View File

@ -13,7 +13,6 @@ type ParamTable struct {
paramtable.BaseTable
Address string
Port int
QueryServiceID UniqueID
// stats
@ -42,7 +41,6 @@ func (p *ParamTable) Init() {
p.initStatsChannelName()
p.initTimeTickChannelName()
p.initQueryServiceAddress()
p.initPort()
})
}
@ -70,7 +68,3 @@ func (p *ParamTable) initQueryServiceAddress() {
}
p.Address = url
}
func (p *ParamTable) initPort() {
p.Port = p.ParseInt("queryService.port")
}

View File

@ -67,33 +67,24 @@ type QueryService struct {
}
func (qs *QueryService) Init() error {
Params.Init()
qs.isInit.Store(true)
return nil
}
func (qs *QueryService) Start() error {
isInit := qs.isInit.Load().(bool)
switch {
case !isInit:
return errors.New("call start before init")
case qs.dataServiceClient == nil:
return errors.New("dataService Client not set")
case qs.masterServiceClient == nil:
return errors.New("masterService Client not set")
}
qs.stateCode.Store(internalpb2.StateCode_HEALTHY)
qs.UpdateStateCode(internalpb2.StateCode_HEALTHY)
return nil
}
func (qs *QueryService) Stop() error {
qs.loopCancel()
qs.stateCode.Store(internalpb2.StateCode_ABNORMAL)
qs.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
return nil
}
func (qs *QueryService) UpdateStateCode(code internalpb2.StateCode) {
qs.stateCode.Store(code)
}
func (qs *QueryService) GetComponentStates() (*internalpb2.ComponentStates, error) {
serviceComponentInfo := &internalpb2.ComponentInfo{
NodeID: Params.QueryServiceID,
@ -624,8 +615,7 @@ func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryServ
qcMutex: &sync.Mutex{},
msFactory: factory,
}
service.stateCode.Store(internalpb2.StateCode_INITIALIZING)
service.isInit.Store(false)
service.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
return service, nil
}

View File

@ -37,7 +37,7 @@ func GetLocalIP() string {
return ipv4.LocalIP()
}
func WaitForComponentReady(service StateComponent, serviceName string, attempts int, sleep time.Duration) error {
func WaitForComponentStates(service StateComponent, serviceName string, states []internalpb2.StateCode, attempts int, sleep time.Duration) error {
checkFunc := func() error {
resp, err := service.GetComponentStates()
if err != nil {
@ -48,18 +48,32 @@ func WaitForComponentReady(service StateComponent, serviceName string, attempts
return errors.New(resp.Status.Reason)
}
if resp.State.StateCode != internalpb2.StateCode_HEALTHY {
return errors.New("")
meet := false
for _, state := range states {
if resp.State.StateCode == state {
meet = true
break
}
}
if !meet {
msg := fmt.Sprintf("WaitForComponentStates, not meet, %s current state:%d", serviceName, resp.State.StateCode)
return errors.New(msg)
}
return nil
}
err := retry.Retry(attempts, sleep, checkFunc)
if err != nil {
errMsg := fmt.Sprintf("ProxyNode wait for %s ready failed", serviceName)
return errors.New(errMsg)
}
return nil
return retry.Retry(attempts, sleep, checkFunc)
}
func WaitForComponentInitOrHealthy(service StateComponent, serviceName string, attempts int, sleep time.Duration) error {
return WaitForComponentStates(service, serviceName, []internalpb2.StateCode{internalpb2.StateCode_INITIALIZING, internalpb2.StateCode_HEALTHY}, attempts, sleep)
}
func WaitForComponentInit(service StateComponent, serviceName string, attempts int, sleep time.Duration) error {
return WaitForComponentStates(service, serviceName, []internalpb2.StateCode{internalpb2.StateCode_INITIALIZING}, attempts, sleep)
}
func WaitForComponentHealthy(service StateComponent, serviceName string, attempts int, sleep time.Duration) error {
return WaitForComponentStates(service, serviceName, []internalpb2.StateCode{internalpb2.StateCode_HEALTHY}, attempts, sleep)
}
func ParseIndexParamsMap(mStr string) (map[string]string, error) {

View File

@ -145,6 +145,23 @@ func (gp *BaseTable) tryloadFromEnv() {
panic(err)
}
proxyServiceAddress := os.Getenv("PROXY_SERVICE_ADDRESS")
if proxyServiceAddress == "" {
addr, err := gp.Load("proxyService.address")
if err != nil {
panic(err)
}
proxyServicePort, err := gp.Load("proxyService.port")
if err != nil {
panic(err)
}
proxyServiceAddress = addr + ":" + proxyServicePort
}
err = gp.Save("_PROXY_SERVICE_ADDRESS", proxyServiceAddress)
if err != nil {
panic(err)
}
indexBuilderAddress := os.Getenv("INDEX_SERVICE_ADDRESS")
if indexBuilderAddress == "" {
indexBuilderHost, err := gp.Load("indexBuilder.address")

View File

@ -1,11 +0,0 @@
cd ../build/docker/deploy/
docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 master
docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 proxyservice
docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 proxynode
docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 indexservice
docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 indexnode
docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 queryservice
docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 dataservice
docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 querynode
docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 datanode

View File

@ -3,6 +3,12 @@ cd ..
echo "starting master"
nohup ./bin/masterservice > ~/masterservice.out 2>&1 &
echo "starting dataservice"
nohup ./bin/dataservice > ~/dataservice.out 2>&1 &
echo "starting datanode"
nohup ./bin/datanode > ~/datanode.out 2>&1 &
echo "starting proxyservice"
nohup ./bin/proxyservice > ~/proxyservice.out 2>&1 &
@ -20,12 +26,6 @@ echo "starting querynode2"
export QUERY_NODE_ID=2
nohup ./bin/querynode > ~/querynode2.out 2>&1 &
echo "starting dataservice"
nohup ./bin/dataservice > ~/dataservice.out 2>&1 &
echo "starting datanode"
nohup ./bin/datanode > ~/datanode.out 2>&1 &
echo "starting indexservice"
nohup ./bin/indexservice > ~/indexservice.out 2>&1 &