From dab536a6cff0c12d9db6b548ea95f16471e229dd Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Tue, 25 May 2021 14:03:06 +0800 Subject: [PATCH] Rename some field names in master for better code readability (#5389) Signed-off-by: yudong.cai --- .../masterservice/masterservice_test.go | 28 ++-- internal/distributed/masterservice/server.go | 123 +++++++++--------- internal/masterservice/master_service.go | 56 ++++---- internal/masterservice/master_service_test.go | 30 ++--- internal/masterservice/task.go | 12 +- 5 files changed, 117 insertions(+), 132 deletions(-) diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 31cde5e59b..486408e9b3 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -112,10 +112,6 @@ func TestGrpcService(t *testing.T) { msFactory := msgstream.NewPmsFactory() svr, err := NewServer(ctx, msFactory) assert.Nil(t, err) - svr.connectQueryService = false - svr.connectProxyService = false - svr.connectIndexService = false - svr.connectDataService = false cms.Params.Init() cms.Params.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal) @@ -188,16 +184,16 @@ func TestGrpcService(t *testing.T) { return nil } - core.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) { + core.CallGetBinlogFilePathsService = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) { return []string{"file1", "file2", "file3"}, nil } - core.GetNumRowsReq = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) { + core.CallGetNumRowsService = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) { return cms.Params.MinSegmentSizeToEnableIndex, nil } var binlogLock sync.Mutex binlogPathArray := make([]string, 0, 16) - core.BuildIndexReq = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) { + core.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) { binlogLock.Lock() defer binlogLock.Unlock() binlogPathArray = append(binlogPathArray, binlog...) @@ -206,7 +202,7 @@ func TestGrpcService(t *testing.T) { var dropIDLock sync.Mutex dropID := make([]typeutil.UniqueID, 0, 16) - core.DropIndexReq = func(ctx context.Context, indexID typeutil.UniqueID) error { + core.CallDropIndexService = func(ctx context.Context, indexID typeutil.UniqueID) error { dropIDLock.Lock() defer dropIDLock.Unlock() dropID = append(dropID, indexID) @@ -214,12 +210,12 @@ func TestGrpcService(t *testing.T) { } collectionMetaCache := make([]string, 0, 16) - core.InvalidateCollectionMetaCache = func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error { + core.CallInvalidateCollectionMetaCacheService = func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error { collectionMetaCache = append(collectionMetaCache, collectionName) return nil } - core.ReleaseCollection = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error { + core.CallReleaseCollectionService = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error { return nil } @@ -908,14 +904,10 @@ func (m *mockQuery) Stop() error { func TestRun(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) svr := Server{ - masterService: &mockCore{}, - ctx: ctx, - cancel: cancel, - grpcErrChan: make(chan error), - connectDataService: true, - connectProxyService: true, - connectIndexService: true, - connectQueryService: true, + masterService: &mockCore{}, + ctx: ctx, + cancel: cancel, + grpcErrChan: make(chan error), } Params.Init() Params.Port = 1000000 diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index 586ba204a7..ad609d2db0 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -63,24 +63,15 @@ type Server struct { newIndexServiceClient func(string) types.IndexService newQueryServiceClient func(string) (types.QueryService, error) - connectProxyService bool - connectDataService bool - connectIndexService bool - connectQueryService bool - closer io.Closer } func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { ctx1, cancel := context.WithCancel(ctx) s := &Server{ - ctx: ctx1, - cancel: cancel, - grpcErrChan: make(chan error), - connectDataService: true, - connectProxyService: true, - connectIndexService: true, - connectQueryService: true, + ctx: ctx1, + cancel: cancel, + grpcErrChan: make(chan error), } s.setClient() var err error @@ -92,17 +83,56 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) } func (s *Server) setClient() { + ctx := context.Background() + s.newProxyServiceClient = func(s string) types.ProxyService { - return psc.NewClient(s) + psClient := psc.NewClient(s) + if err := psClient.Init(); err != nil { + panic(err) + } + if err := psClient.Start(); err != nil { + panic(err) + } + if err := funcutil.WaitForComponentInitOrHealthy(ctx, psClient, "ProxyService", 1000000, 200*time.Millisecond); err != nil { + panic(err) + } + return psClient } s.newDataServiceClient = func(s string) types.DataService { - return dsc.NewClient(s) + dsClient := dsc.NewClient(s) + if err := dsClient.Init(); err != nil { + panic(err) + } + if err := dsClient.Start(); err != nil { + panic(err) + } + if err := funcutil.WaitForComponentInitOrHealthy(ctx, dsClient, "DataService", 1000000, 200*time.Millisecond); err != nil { + panic(err) + } + return dsClient } s.newIndexServiceClient = func(s string) types.IndexService { - return isc.NewClient(s) + isClient := isc.NewClient(s) + if err := isClient.Init(); err != nil { + panic(err) + } + if err := isClient.Start(); err != nil { + panic(err) + } + return isClient } s.newQueryServiceClient = func(s string) (types.QueryService, error) { - return qsc.NewClient(s, 5*time.Second) + qsClient, err := qsc.NewClient(s, 5*time.Second) + if err != nil { + panic(err) + } + if err := qsClient.Init(); err != nil { + panic(err) + } + if err := qsClient.Start(); err != nil { + panic(err) + } + return qsClient, nil } } @@ -110,7 +140,6 @@ func (s *Server) Run() error { if err := s.init(); err != nil { return err } - if err := s.start(); err != nil { return err } @@ -132,82 +161,46 @@ func (s *Server) init() error { log.Debug("init params done") - err := s.startGrpc() - if err != nil { + if err := s.startGrpc(); err != nil { return err } s.masterService.UpdateStateCode(internalpb.StateCode_Initializing) - if s.connectProxyService { + if s.newProxyServiceClient != nil { log.Debug("proxy service", zap.String("address", Params.ProxyServiceAddress)) proxyService := s.newProxyServiceClient(Params.ProxyServiceAddress) - if err := proxyService.Init(); err != nil { - panic(err) - } - - err := funcutil.WaitForComponentInitOrHealthy(ctx, proxyService, "ProxyService", 1000000, 200*time.Millisecond) - if err != nil { - panic(err) - } - - if err = s.masterService.SetProxyService(ctx, proxyService); err != nil { + if err := s.masterService.SetProxyService(ctx, proxyService); err != nil { panic(err) } s.proxyService = proxyService } - if s.connectDataService { + if s.newDataServiceClient != nil { log.Debug("data service", zap.String("address", Params.DataServiceAddress)) dataService := s.newDataServiceClient(Params.DataServiceAddress) - if err := dataService.Init(); err != nil { - panic(err) - } - if err := dataService.Start(); err != nil { - panic(err) - } - err := funcutil.WaitForComponentInitOrHealthy(ctx, dataService, "DataService", 1000000, 200*time.Millisecond) - if err != nil { - panic(err) - } - - if err = s.masterService.SetDataService(ctx, dataService); err != nil { + if err := s.masterService.SetDataService(ctx, dataService); err != nil { panic(err) } s.dataService = dataService } - if s.connectIndexService { + if s.newIndexServiceClient != nil { log.Debug("index service", zap.String("address", Params.IndexServiceAddress)) indexService := s.newIndexServiceClient(Params.IndexServiceAddress) - if err := indexService.Init(); err != nil { - panic(err) - } - if err := s.masterService.SetIndexService(indexService); err != nil { panic(err) - } s.indexService = indexService } - if s.connectQueryService { - queryService, err := s.newQueryServiceClient(Params.QueryServiceAddress) - if err != nil { - panic(err) - } - if err = queryService.Init(); err != nil { - panic(err) - } - if err = queryService.Start(); err != nil { - panic(err) - } - if err = s.masterService.SetQueryService(queryService); err != nil { + if s.newQueryServiceClient != nil { + log.Debug("query service", zap.String("address", Params.QueryServiceAddress)) + queryService, _ := s.newQueryServiceClient(Params.QueryServiceAddress) + if err := s.masterService.SetQueryService(queryService); err != nil { panic(err) } s.queryService = queryService } - if err := s.masterService.Init(); err != nil { - return err - } - return nil + + return s.masterService.Init() } func (s *Server) startGrpc() error { diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 1d733c23f3..1c04ea94c2 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -121,18 +121,18 @@ type Core struct { DataNodeFlushedSegmentChan <-chan *ms.MsgPack //get binlog file path from data service, - GetBinlogFilePathsFromDataServiceReq func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) - GetNumRowsReq func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) + CallGetBinlogFilePathsService func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) + CallGetNumRowsService func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) //call index builder's client to build index, return build id - BuildIndexReq func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) - DropIndexReq func(ctx context.Context, indexID typeutil.UniqueID) error + CallBuildIndexService func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) + CallDropIndexService func(ctx context.Context, indexID typeutil.UniqueID) error //proxy service interface, notify proxy service to drop collection - InvalidateCollectionMetaCache func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error + CallInvalidateCollectionMetaCacheService func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error //query service interface, notify query service to release collection - ReleaseCollection func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error + CallReleaseCollectionService func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error //dd request scheduler ddReqQueue chan reqTask //dd request will be push into this chan @@ -214,20 +214,23 @@ func (c *Core) checkInit() error { if c.SendDdDropPartitionReq == nil { return fmt.Errorf("SendDdDropPartitionReq is nil") } - if c.GetBinlogFilePathsFromDataServiceReq == nil { - return fmt.Errorf("GetBinlogFilePathsFromDataServiceReq is nil") + if c.CallGetBinlogFilePathsService == nil { + return fmt.Errorf("CallGetBinlogFilePathsService is nil") } - if c.GetNumRowsReq == nil { - return fmt.Errorf("GetNumRowsReq is nil") + if c.CallGetNumRowsService == nil { + return fmt.Errorf("CallGetNumRowsService is nil") } - if c.BuildIndexReq == nil { - return fmt.Errorf("BuildIndexReq is nil") + if c.CallBuildIndexService == nil { + return fmt.Errorf("CallBuildIndexService is nil") } - if c.DropIndexReq == nil { - return fmt.Errorf("DropIndexReq is nil") + if c.CallDropIndexService == nil { + return fmt.Errorf("CallDropIndexService is nil") } - if c.InvalidateCollectionMetaCache == nil { - return fmt.Errorf("InvalidateCollectionMetaCache is nil") + if c.CallInvalidateCollectionMetaCacheService == nil { + return fmt.Errorf("CallInvalidateCollectionMetaCacheService is nil") + } + if c.CallReleaseCollectionService == nil { + return fmt.Errorf("CallReleaseCollectionService is nil") } if c.DataServiceSegmentChan == nil { return fmt.Errorf("DataServiceSegmentChan is nil") @@ -235,9 +238,6 @@ func (c *Core) checkInit() error { if c.DataNodeFlushedSegmentChan == nil { return fmt.Errorf("DataNodeFlushedSegmentChan is nil") } - if c.ReleaseCollection == nil { - return fmt.Errorf("ReleaseCollection is nil") - } return nil } @@ -672,7 +672,7 @@ func (c *Core) SetProxyService(ctx context.Context, s types.ProxyService) error Params.ProxyTimeTickChannel = rsp.Value log.Debug("proxy time tick", zap.String("channel name", Params.ProxyTimeTickChannel)) - c.InvalidateCollectionMetaCache = func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error { + c.CallInvalidateCollectionMetaCacheService = func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error { status, _ := s.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{ Base: &commonpb.MsgBase{ MsgType: 0, //TODO,MsgType @@ -702,7 +702,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error { Params.DataServiceSegmentChannel = rsp.Value log.Debug("data service segment", zap.String("channel name", Params.DataServiceSegmentChannel)) - c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) { + c.CallGetBinlogFilePathsService = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) { ts, err := c.TSOAllocator(1) if err != nil { return nil, err @@ -730,7 +730,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error { return nil, fmt.Errorf("binlog file not exist, segment id = %d, field id = %d", segID, fieldID) } - c.GetNumRowsReq = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) { + c.CallGetNumRowsService = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) { ts, err := c.TSOAllocator(1) if err != nil { return 0, err @@ -764,7 +764,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error { } func (c *Core) SetIndexService(s types.IndexService) error { - c.BuildIndexReq = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) { + c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) { rsp, err := s.BuildIndex(ctx, &indexpb.BuildIndexRequest{ DataPaths: binlog, TypeParams: field.TypeParams, @@ -781,7 +781,7 @@ func (c *Core) SetIndexService(s types.IndexService) error { return rsp.IndexBuildID, nil } - c.DropIndexReq = func(ctx context.Context, indexID typeutil.UniqueID) error { + c.CallDropIndexService = func(ctx context.Context, indexID typeutil.UniqueID) error { rsp, err := s.DropIndex(ctx, &indexpb.DropIndexRequest{ IndexID: indexID, }) @@ -798,7 +798,7 @@ func (c *Core) SetIndexService(s types.IndexService) error { } func (c *Core) SetQueryService(s types.QueryService) error { - c.ReleaseCollection = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error { + c.CallReleaseCollectionService = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error { req := &querypb.ReleaseCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ReleaseCollection, @@ -826,7 +826,7 @@ func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema, if c.MetaTable.IsSegmentIndexed(segID, field, idxInfo.IndexParams) { return 0, nil } - rows, err := c.GetNumRowsReq(segID, isFlush) + rows, err := c.CallGetNumRowsService(segID, isFlush) if err != nil { return 0, err } @@ -834,11 +834,11 @@ func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema, if rows < Params.MinSegmentSizeToEnableIndex { log.Debug("num of rows is less than MinSegmentSizeToEnableIndex", zap.Int64("num rows", rows)) } else { - binlogs, err := c.GetBinlogFilePathsFromDataServiceReq(segID, field.FieldID) + binlogs, err := c.CallGetBinlogFilePathsService(segID, field.FieldID) if err != nil { return 0, err } - bldID, err = c.BuildIndexReq(c.ctx, binlogs, field, idxInfo) + bldID, err = c.CallBuildIndexService(c.ctx, binlogs, field, idxInfo) if err != nil { return 0, err } diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index c3bd5fa58b..0446dcf0d3 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -1919,47 +1919,47 @@ func TestCheckInit(t *testing.T) { err = c.checkInit() assert.NotNil(t, err) - c.DataServiceSegmentChan = make(chan *msgstream.MsgPack) - err = c.checkInit() - assert.NotNil(t, err) - - c.GetBinlogFilePathsFromDataServiceReq = func(segID, fieldID typeutil.UniqueID) ([]string, error) { + c.CallGetBinlogFilePathsService = func(segID, fieldID typeutil.UniqueID) ([]string, error) { return []string{}, nil } err = c.checkInit() assert.NotNil(t, err) - c.GetNumRowsReq = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) { + c.CallGetNumRowsService = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) { return 0, nil } err = c.checkInit() assert.NotNil(t, err) - c.BuildIndexReq = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) { + c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) { return 0, nil } err = c.checkInit() assert.NotNil(t, err) - c.DropIndexReq = func(ctx context.Context, indexID typeutil.UniqueID) error { + c.CallDropIndexService = func(ctx context.Context, indexID typeutil.UniqueID) error { return nil } err = c.checkInit() assert.NotNil(t, err) - c.InvalidateCollectionMetaCache = func(ctx context.Context, ts typeutil.Timestamp, dbName, collectionName string) error { + c.CallInvalidateCollectionMetaCacheService = func(ctx context.Context, ts typeutil.Timestamp, dbName, collectionName string) error { return nil } err = c.checkInit() assert.NotNil(t, err) + c.CallReleaseCollectionService = func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID) error { + return nil + } + err = c.checkInit() + assert.NotNil(t, err) + + c.DataServiceSegmentChan = make(chan *msgstream.MsgPack) + err = c.checkInit() + assert.NotNil(t, err) + c.DataNodeFlushedSegmentChan = make(chan *msgstream.MsgPack) err = c.checkInit() - assert.NotNil(t, err) - - c.ReleaseCollection = func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID) error { - return nil - } - err = c.checkInit() assert.Nil(t, err) } diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index be99ab1d34..cf151c7452 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -310,13 +310,13 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { //notify query service to release collection go func() { - if err = t.core.ReleaseCollection(t.core.ctx, t.Req.Base.Timestamp, 0, collMeta.ID); err != nil { - log.Warn("ReleaseCollection failed", zap.String("error", err.Error())) + if err = t.core.CallReleaseCollectionService(t.core.ctx, t.Req.Base.Timestamp, 0, collMeta.ID); err != nil { + log.Warn("CallReleaseCollectionService failed", zap.String("error", err.Error())) } }() // error doesn't matter here - t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) + t.core.CallInvalidateCollectionMetaCacheService(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) // Update DDOperation in etcd return t.core.setDdMsgSendFlag(true) @@ -483,7 +483,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { } // error doesn't matter here - t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) + t.core.CallInvalidateCollectionMetaCacheService(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) // Update DDOperation in etcd return t.core.setDdMsgSendFlag(true) @@ -549,7 +549,7 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { } // error doesn't matter here - t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) + t.core.CallInvalidateCollectionMetaCacheService(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) // Update DDOperation in etcd return t.core.setDdMsgSendFlag(true) @@ -833,7 +833,7 @@ func (t *DropIndexReqTask) Execute(ctx context.Context) error { if len(info) != 1 { return fmt.Errorf("len(index) = %d", len(info)) } - err = t.core.DropIndexReq(ctx, info[0].IndexID) + err = t.core.CallDropIndexService(ctx, info[0].IndexID) if err != nil { return err }