Rename some field names in master for better code readability (#5389)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/5385/head
Cai Yudong 2021-05-25 14:03:06 +08:00 committed by GitHub
parent 4058350e30
commit dab536a6cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 117 additions and 132 deletions

View File

@ -112,10 +112,6 @@ func TestGrpcService(t *testing.T) {
msFactory := msgstream.NewPmsFactory()
svr, err := NewServer(ctx, msFactory)
assert.Nil(t, err)
svr.connectQueryService = false
svr.connectProxyService = false
svr.connectIndexService = false
svr.connectDataService = false
cms.Params.Init()
cms.Params.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal)
@ -188,16 +184,16 @@ func TestGrpcService(t *testing.T) {
return nil
}
core.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) {
core.CallGetBinlogFilePathsService = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) {
return []string{"file1", "file2", "file3"}, nil
}
core.GetNumRowsReq = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) {
core.CallGetNumRowsService = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) {
return cms.Params.MinSegmentSizeToEnableIndex, nil
}
var binlogLock sync.Mutex
binlogPathArray := make([]string, 0, 16)
core.BuildIndexReq = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) {
core.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) {
binlogLock.Lock()
defer binlogLock.Unlock()
binlogPathArray = append(binlogPathArray, binlog...)
@ -206,7 +202,7 @@ func TestGrpcService(t *testing.T) {
var dropIDLock sync.Mutex
dropID := make([]typeutil.UniqueID, 0, 16)
core.DropIndexReq = func(ctx context.Context, indexID typeutil.UniqueID) error {
core.CallDropIndexService = func(ctx context.Context, indexID typeutil.UniqueID) error {
dropIDLock.Lock()
defer dropIDLock.Unlock()
dropID = append(dropID, indexID)
@ -214,12 +210,12 @@ func TestGrpcService(t *testing.T) {
}
collectionMetaCache := make([]string, 0, 16)
core.InvalidateCollectionMetaCache = func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error {
core.CallInvalidateCollectionMetaCacheService = func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error {
collectionMetaCache = append(collectionMetaCache, collectionName)
return nil
}
core.ReleaseCollection = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error {
core.CallReleaseCollectionService = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error {
return nil
}
@ -908,14 +904,10 @@ func (m *mockQuery) Stop() error {
func TestRun(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
svr := Server{
masterService: &mockCore{},
ctx: ctx,
cancel: cancel,
grpcErrChan: make(chan error),
connectDataService: true,
connectProxyService: true,
connectIndexService: true,
connectQueryService: true,
masterService: &mockCore{},
ctx: ctx,
cancel: cancel,
grpcErrChan: make(chan error),
}
Params.Init()
Params.Port = 1000000

View File

@ -63,24 +63,15 @@ type Server struct {
newIndexServiceClient func(string) types.IndexService
newQueryServiceClient func(string) (types.QueryService, error)
connectProxyService bool
connectDataService bool
connectIndexService bool
connectQueryService bool
closer io.Closer
}
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
ctx1, cancel := context.WithCancel(ctx)
s := &Server{
ctx: ctx1,
cancel: cancel,
grpcErrChan: make(chan error),
connectDataService: true,
connectProxyService: true,
connectIndexService: true,
connectQueryService: true,
ctx: ctx1,
cancel: cancel,
grpcErrChan: make(chan error),
}
s.setClient()
var err error
@ -92,17 +83,56 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
}
func (s *Server) setClient() {
ctx := context.Background()
s.newProxyServiceClient = func(s string) types.ProxyService {
return psc.NewClient(s)
psClient := psc.NewClient(s)
if err := psClient.Init(); err != nil {
panic(err)
}
if err := psClient.Start(); err != nil {
panic(err)
}
if err := funcutil.WaitForComponentInitOrHealthy(ctx, psClient, "ProxyService", 1000000, 200*time.Millisecond); err != nil {
panic(err)
}
return psClient
}
s.newDataServiceClient = func(s string) types.DataService {
return dsc.NewClient(s)
dsClient := dsc.NewClient(s)
if err := dsClient.Init(); err != nil {
panic(err)
}
if err := dsClient.Start(); err != nil {
panic(err)
}
if err := funcutil.WaitForComponentInitOrHealthy(ctx, dsClient, "DataService", 1000000, 200*time.Millisecond); err != nil {
panic(err)
}
return dsClient
}
s.newIndexServiceClient = func(s string) types.IndexService {
return isc.NewClient(s)
isClient := isc.NewClient(s)
if err := isClient.Init(); err != nil {
panic(err)
}
if err := isClient.Start(); err != nil {
panic(err)
}
return isClient
}
s.newQueryServiceClient = func(s string) (types.QueryService, error) {
return qsc.NewClient(s, 5*time.Second)
qsClient, err := qsc.NewClient(s, 5*time.Second)
if err != nil {
panic(err)
}
if err := qsClient.Init(); err != nil {
panic(err)
}
if err := qsClient.Start(); err != nil {
panic(err)
}
return qsClient, nil
}
}
@ -110,7 +140,6 @@ func (s *Server) Run() error {
if err := s.init(); err != nil {
return err
}
if err := s.start(); err != nil {
return err
}
@ -132,82 +161,46 @@ func (s *Server) init() error {
log.Debug("init params done")
err := s.startGrpc()
if err != nil {
if err := s.startGrpc(); err != nil {
return err
}
s.masterService.UpdateStateCode(internalpb.StateCode_Initializing)
if s.connectProxyService {
if s.newProxyServiceClient != nil {
log.Debug("proxy service", zap.String("address", Params.ProxyServiceAddress))
proxyService := s.newProxyServiceClient(Params.ProxyServiceAddress)
if err := proxyService.Init(); err != nil {
panic(err)
}
err := funcutil.WaitForComponentInitOrHealthy(ctx, proxyService, "ProxyService", 1000000, 200*time.Millisecond)
if err != nil {
panic(err)
}
if err = s.masterService.SetProxyService(ctx, proxyService); err != nil {
if err := s.masterService.SetProxyService(ctx, proxyService); err != nil {
panic(err)
}
s.proxyService = proxyService
}
if s.connectDataService {
if s.newDataServiceClient != nil {
log.Debug("data service", zap.String("address", Params.DataServiceAddress))
dataService := s.newDataServiceClient(Params.DataServiceAddress)
if err := dataService.Init(); err != nil {
panic(err)
}
if err := dataService.Start(); err != nil {
panic(err)
}
err := funcutil.WaitForComponentInitOrHealthy(ctx, dataService, "DataService", 1000000, 200*time.Millisecond)
if err != nil {
panic(err)
}
if err = s.masterService.SetDataService(ctx, dataService); err != nil {
if err := s.masterService.SetDataService(ctx, dataService); err != nil {
panic(err)
}
s.dataService = dataService
}
if s.connectIndexService {
if s.newIndexServiceClient != nil {
log.Debug("index service", zap.String("address", Params.IndexServiceAddress))
indexService := s.newIndexServiceClient(Params.IndexServiceAddress)
if err := indexService.Init(); err != nil {
panic(err)
}
if err := s.masterService.SetIndexService(indexService); err != nil {
panic(err)
}
s.indexService = indexService
}
if s.connectQueryService {
queryService, err := s.newQueryServiceClient(Params.QueryServiceAddress)
if err != nil {
panic(err)
}
if err = queryService.Init(); err != nil {
panic(err)
}
if err = queryService.Start(); err != nil {
panic(err)
}
if err = s.masterService.SetQueryService(queryService); err != nil {
if s.newQueryServiceClient != nil {
log.Debug("query service", zap.String("address", Params.QueryServiceAddress))
queryService, _ := s.newQueryServiceClient(Params.QueryServiceAddress)
if err := s.masterService.SetQueryService(queryService); err != nil {
panic(err)
}
s.queryService = queryService
}
if err := s.masterService.Init(); err != nil {
return err
}
return nil
return s.masterService.Init()
}
func (s *Server) startGrpc() error {

View File

@ -121,18 +121,18 @@ type Core struct {
DataNodeFlushedSegmentChan <-chan *ms.MsgPack
//get binlog file path from data service,
GetBinlogFilePathsFromDataServiceReq func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error)
GetNumRowsReq func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error)
CallGetBinlogFilePathsService func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error)
CallGetNumRowsService func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error)
//call index builder's client to build index, return build id
BuildIndexReq func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error)
DropIndexReq func(ctx context.Context, indexID typeutil.UniqueID) error
CallBuildIndexService func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error)
CallDropIndexService func(ctx context.Context, indexID typeutil.UniqueID) error
//proxy service interface, notify proxy service to drop collection
InvalidateCollectionMetaCache func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error
CallInvalidateCollectionMetaCacheService func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error
//query service interface, notify query service to release collection
ReleaseCollection func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error
CallReleaseCollectionService func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error
//dd request scheduler
ddReqQueue chan reqTask //dd request will be push into this chan
@ -214,20 +214,23 @@ func (c *Core) checkInit() error {
if c.SendDdDropPartitionReq == nil {
return fmt.Errorf("SendDdDropPartitionReq is nil")
}
if c.GetBinlogFilePathsFromDataServiceReq == nil {
return fmt.Errorf("GetBinlogFilePathsFromDataServiceReq is nil")
if c.CallGetBinlogFilePathsService == nil {
return fmt.Errorf("CallGetBinlogFilePathsService is nil")
}
if c.GetNumRowsReq == nil {
return fmt.Errorf("GetNumRowsReq is nil")
if c.CallGetNumRowsService == nil {
return fmt.Errorf("CallGetNumRowsService is nil")
}
if c.BuildIndexReq == nil {
return fmt.Errorf("BuildIndexReq is nil")
if c.CallBuildIndexService == nil {
return fmt.Errorf("CallBuildIndexService is nil")
}
if c.DropIndexReq == nil {
return fmt.Errorf("DropIndexReq is nil")
if c.CallDropIndexService == nil {
return fmt.Errorf("CallDropIndexService is nil")
}
if c.InvalidateCollectionMetaCache == nil {
return fmt.Errorf("InvalidateCollectionMetaCache is nil")
if c.CallInvalidateCollectionMetaCacheService == nil {
return fmt.Errorf("CallInvalidateCollectionMetaCacheService is nil")
}
if c.CallReleaseCollectionService == nil {
return fmt.Errorf("CallReleaseCollectionService is nil")
}
if c.DataServiceSegmentChan == nil {
return fmt.Errorf("DataServiceSegmentChan is nil")
@ -235,9 +238,6 @@ func (c *Core) checkInit() error {
if c.DataNodeFlushedSegmentChan == nil {
return fmt.Errorf("DataNodeFlushedSegmentChan is nil")
}
if c.ReleaseCollection == nil {
return fmt.Errorf("ReleaseCollection is nil")
}
return nil
}
@ -672,7 +672,7 @@ func (c *Core) SetProxyService(ctx context.Context, s types.ProxyService) error
Params.ProxyTimeTickChannel = rsp.Value
log.Debug("proxy time tick", zap.String("channel name", Params.ProxyTimeTickChannel))
c.InvalidateCollectionMetaCache = func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error {
c.CallInvalidateCollectionMetaCacheService = func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error {
status, _ := s.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO,MsgType
@ -702,7 +702,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
Params.DataServiceSegmentChannel = rsp.Value
log.Debug("data service segment", zap.String("channel name", Params.DataServiceSegmentChannel))
c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) {
c.CallGetBinlogFilePathsService = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) {
ts, err := c.TSOAllocator(1)
if err != nil {
return nil, err
@ -730,7 +730,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
return nil, fmt.Errorf("binlog file not exist, segment id = %d, field id = %d", segID, fieldID)
}
c.GetNumRowsReq = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) {
c.CallGetNumRowsService = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) {
ts, err := c.TSOAllocator(1)
if err != nil {
return 0, err
@ -764,7 +764,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
}
func (c *Core) SetIndexService(s types.IndexService) error {
c.BuildIndexReq = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) {
c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) {
rsp, err := s.BuildIndex(ctx, &indexpb.BuildIndexRequest{
DataPaths: binlog,
TypeParams: field.TypeParams,
@ -781,7 +781,7 @@ func (c *Core) SetIndexService(s types.IndexService) error {
return rsp.IndexBuildID, nil
}
c.DropIndexReq = func(ctx context.Context, indexID typeutil.UniqueID) error {
c.CallDropIndexService = func(ctx context.Context, indexID typeutil.UniqueID) error {
rsp, err := s.DropIndex(ctx, &indexpb.DropIndexRequest{
IndexID: indexID,
})
@ -798,7 +798,7 @@ func (c *Core) SetIndexService(s types.IndexService) error {
}
func (c *Core) SetQueryService(s types.QueryService) error {
c.ReleaseCollection = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error {
c.CallReleaseCollectionService = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error {
req := &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleaseCollection,
@ -826,7 +826,7 @@ func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema,
if c.MetaTable.IsSegmentIndexed(segID, field, idxInfo.IndexParams) {
return 0, nil
}
rows, err := c.GetNumRowsReq(segID, isFlush)
rows, err := c.CallGetNumRowsService(segID, isFlush)
if err != nil {
return 0, err
}
@ -834,11 +834,11 @@ func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema,
if rows < Params.MinSegmentSizeToEnableIndex {
log.Debug("num of rows is less than MinSegmentSizeToEnableIndex", zap.Int64("num rows", rows))
} else {
binlogs, err := c.GetBinlogFilePathsFromDataServiceReq(segID, field.FieldID)
binlogs, err := c.CallGetBinlogFilePathsService(segID, field.FieldID)
if err != nil {
return 0, err
}
bldID, err = c.BuildIndexReq(c.ctx, binlogs, field, idxInfo)
bldID, err = c.CallBuildIndexService(c.ctx, binlogs, field, idxInfo)
if err != nil {
return 0, err
}

View File

@ -1919,47 +1919,47 @@ func TestCheckInit(t *testing.T) {
err = c.checkInit()
assert.NotNil(t, err)
c.DataServiceSegmentChan = make(chan *msgstream.MsgPack)
err = c.checkInit()
assert.NotNil(t, err)
c.GetBinlogFilePathsFromDataServiceReq = func(segID, fieldID typeutil.UniqueID) ([]string, error) {
c.CallGetBinlogFilePathsService = func(segID, fieldID typeutil.UniqueID) ([]string, error) {
return []string{}, nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.GetNumRowsReq = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) {
c.CallGetNumRowsService = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) {
return 0, nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.BuildIndexReq = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) {
c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) {
return 0, nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.DropIndexReq = func(ctx context.Context, indexID typeutil.UniqueID) error {
c.CallDropIndexService = func(ctx context.Context, indexID typeutil.UniqueID) error {
return nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.InvalidateCollectionMetaCache = func(ctx context.Context, ts typeutil.Timestamp, dbName, collectionName string) error {
c.CallInvalidateCollectionMetaCacheService = func(ctx context.Context, ts typeutil.Timestamp, dbName, collectionName string) error {
return nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.CallReleaseCollectionService = func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID) error {
return nil
}
err = c.checkInit()
assert.NotNil(t, err)
c.DataServiceSegmentChan = make(chan *msgstream.MsgPack)
err = c.checkInit()
assert.NotNil(t, err)
c.DataNodeFlushedSegmentChan = make(chan *msgstream.MsgPack)
err = c.checkInit()
assert.NotNil(t, err)
c.ReleaseCollection = func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID) error {
return nil
}
err = c.checkInit()
assert.Nil(t, err)
}

View File

@ -310,13 +310,13 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
//notify query service to release collection
go func() {
if err = t.core.ReleaseCollection(t.core.ctx, t.Req.Base.Timestamp, 0, collMeta.ID); err != nil {
log.Warn("ReleaseCollection failed", zap.String("error", err.Error()))
if err = t.core.CallReleaseCollectionService(t.core.ctx, t.Req.Base.Timestamp, 0, collMeta.ID); err != nil {
log.Warn("CallReleaseCollectionService failed", zap.String("error", err.Error()))
}
}()
// error doesn't matter here
t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
t.core.CallInvalidateCollectionMetaCacheService(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
@ -483,7 +483,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
}
// error doesn't matter here
t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
t.core.CallInvalidateCollectionMetaCacheService(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
@ -549,7 +549,7 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
}
// error doesn't matter here
t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
t.core.CallInvalidateCollectionMetaCacheService(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
@ -833,7 +833,7 @@ func (t *DropIndexReqTask) Execute(ctx context.Context) error {
if len(info) != 1 {
return fmt.Errorf("len(index) = %d", len(info))
}
err = t.core.DropIndexReq(ctx, info[0].IndexID)
err = t.core.CallDropIndexService(ctx, info[0].IndexID)
if err != nil {
return err
}