rootcoord connect to other components in go routines (#6117)

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>
pull/6123/head
neza2017 2021-06-25 16:48:10 +08:00 committed by GitHub
parent dc63d82595
commit da12e55c6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 70 additions and 31 deletions

View File

@ -18,7 +18,6 @@ import (
"net"
"strconv"
"sync"
"time"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -81,22 +80,11 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
}
func (s *Server) setClient() {
ctx := context.Background()
s.newDataCoordClient = func(etcdMetaRoot string, etcdEndpoints []string) types.DataCoord {
dsClient, err := dsc.NewClient(s.ctx, etcdMetaRoot, etcdEndpoints)
if err != nil {
panic(err)
}
if err := dsClient.Init(); err != nil {
panic(err)
}
if err := dsClient.Start(); err != nil {
panic(err)
}
if err := funcutil.WaitForComponentInitOrHealthy(ctx, dsClient, "DataCoord", 1000000, 200*time.Millisecond); err != nil {
panic(err)
}
return dsClient
}
s.newIndexCoordClient = func(metaRootPath string, etcdEndpoints []string) types.IndexCoord {
@ -104,12 +92,6 @@ func (s *Server) setClient() {
if err != nil {
panic(err)
}
if err := isClient.Init(); err != nil {
panic(err)
}
if err := isClient.Start(); err != nil {
panic(err)
}
return isClient
}
s.newQueryCoordClient = func(metaRootPath string, etcdEndpoints []string) types.QueryCoord {
@ -117,12 +99,6 @@ func (s *Server) setClient() {
if err != nil {
panic(err)
}
if err := qsClient.Init(); err != nil {
panic(err)
}
if err := qsClient.Start(); err != nil {
panic(err)
}
return qsClient
}
}

View File

@ -663,13 +663,19 @@ func (c *Core) SetNewProxyClient(f func(sess *sessionutil.Session) (types.Proxy,
}
func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error {
rsp, err := s.GetSegmentInfoChannel(ctx)
if err != nil {
return err
}
Params.DataCoordSegmentChannel = rsp.Value
log.Debug("data service segment", zap.String("channel name", Params.DataCoordSegmentChannel))
initCh := make(chan struct{})
go func() {
for {
if err := s.Init(); err == nil {
if err := s.Start(); err == nil {
close(initCh)
log.Debug("RootCoord connect to DataCoord")
return
}
}
log.Debug("RootCoord connect to DataCoord, retry")
}
}()
c.CallGetBinlogFilePathsService = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) (retFiles []string, retErr error) {
defer func() {
if err := recover(); err != nil {
@ -677,6 +683,7 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error {
retErr = fmt.Errorf("get bin log file paths panic, msg = %v", err)
}
}()
<-initCh //wait connect to data coord
ts, err := c.TSOAllocator(1)
if err != nil {
retFiles = nil
@ -722,6 +729,7 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error {
return
}
}()
<-initCh
ts, err := c.TSOAllocator(1)
if err != nil {
retRows = 0
@ -765,6 +773,20 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error {
}
func (c *Core) SetIndexCoord(s types.IndexCoord) error {
initCh := make(chan struct{})
go func() {
for {
if err := s.Init(); err == nil {
if err := s.Start(); err == nil {
close(initCh)
log.Debug("RootCoord connect to IndexCoord")
return
}
}
log.Debug("RootCoord connect to IndexCoord, retry")
}
}()
c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (retID typeutil.UniqueID, retErr error) {
defer func() {
if err := recover(); err != nil {
@ -773,6 +795,7 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error {
return
}
}()
<-initCh
rsp, err := s.BuildIndex(ctx, &indexpb.BuildIndexRequest{
DataPaths: binlog,
TypeParams: field.TypeParams,
@ -802,6 +825,7 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error {
return
}
}()
<-initCh
rsp, err := s.DropIndex(ctx, &indexpb.DropIndexRequest{
IndexID: indexID,
})
@ -821,6 +845,19 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error {
}
func (c *Core) SetQueryCoord(s types.QueryCoord) error {
initCh := make(chan struct{})
go func() {
for {
if err := s.Init(); err == nil {
if err := s.Start(); err == nil {
close(initCh)
log.Debug("RootCoord connect to QueryCoord")
return
}
}
log.Debug("RootCoord connect to QueryCoord, retry")
}
}()
c.CallReleaseCollectionService = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) (retErr error) {
defer func() {
if err := recover(); err != nil {
@ -828,6 +865,7 @@ func (c *Core) SetQueryCoord(s types.QueryCoord) error {
return
}
}()
<-initCh
req := &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleaseCollection,
@ -856,6 +894,7 @@ func (c *Core) SetQueryCoord(s types.QueryCoord) error {
retErr = fmt.Errorf("release partition from query service panic, msg = %v", err)
}
}()
<-initCh
req := &querypb.ReleasePartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleasePartitions,

View File

@ -68,6 +68,14 @@ type dataMock struct {
randVal int
}
func (d *dataMock) Init() error {
return nil
}
func (d *dataMock) Start() error {
return nil
}
func (d *dataMock) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) {
rst := &datapb.GetInsertBinlogPathsResponse{
FieldIDs: []int64{},
@ -121,6 +129,14 @@ type queryMock struct {
mutex sync.Mutex
}
func (q *queryMock) Init() error {
return nil
}
func (q *queryMock) Start() error {
return nil
}
func (q *queryMock) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
q.mutex.Lock()
defer q.mutex.Unlock()
@ -147,6 +163,14 @@ type indexMock struct {
mutex sync.Mutex
}
func (idx *indexMock) Init() error {
return nil
}
func (idx *indexMock) Start() error {
return nil
}
func (idx *indexMock) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
idx.mutex.Lock()
defer idx.mutex.Unlock()