fix time tick (#6017)

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>
pull/6020/head
neza2017 2021-06-23 15:28:09 +08:00 committed by GitHub
parent b66ab9d809
commit 9ebca59099
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 7 additions and 39 deletions

View File

@ -214,5 +214,5 @@ func (p *ParamTable) initLogCfg() {
}
func (p *ParamTable) initRoleName() {
p.RoleName = "MasterService"
p.RoleName = "RootCoord"
}

View File

@ -257,27 +257,15 @@ func (c *Core) startDdScheduler() {
func (c *Core) startTimeTickLoop() {
ticker := time.NewTicker(time.Duration(Params.TimeTickInterval) * time.Millisecond)
cnt := 0
for {
select {
case <-c.ctx.Done():
log.Debug("rootcoord context closed", zap.Error(c.ctx.Err()))
return
case <-ticker.C:
if len(c.ddReqQueue) < 2 || cnt > 5 {
tt := &TimetickTask{
baseReqTask: baseReqTask{
ctx: c.ctx,
cv: make(chan error, 1),
core: c,
},
}
c.ddReqQueue <- tt
cnt = 0
} else {
cnt++
if ts, err := c.TSOAllocator(1); err == nil {
c.SendTimeTick(ts)
}
}
}
}

View File

@ -59,26 +59,6 @@ func (bt *baseReqTask) WaitToFinish() error {
}
}
type TimetickTask struct {
baseReqTask
}
func (t *TimetickTask) Ctx() context.Context {
return t.ctx
}
func (t *TimetickTask) Type() commonpb.MsgType {
return commonpb.MsgType_TimeTick
}
func (t *TimetickTask) Execute(ctx context.Context) error {
ts, err := t.core.TSOAllocator(1)
if err != nil {
return err
}
return t.core.SendTimeTick(ts)
}
type CreateCollectionReqTask struct {
baseReqTask
Req *milvuspb.CreateCollectionRequest
@ -699,7 +679,7 @@ func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error {
}
//TODO, get filed_id and index_name from request
segIdxInfo, err := t.core.MetaTable.GetSegmentIndexInfoByID(t.Req.SegmentID, -1, "")
log.Debug("MasterService DescribeSegmentReqTask, MetaTable.GetSegmentIndexInfoByID", zap.Any("SegmentID", t.Req.SegmentID),
log.Debug("RootCoord DescribeSegmentReqTask, MetaTable.GetSegmentIndexInfoByID", zap.Any("SegmentID", t.Req.SegmentID),
zap.Any("segIdxInfo", segIdxInfo), zap.Error(err))
if err != nil {
return err
@ -769,7 +749,7 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
}
indexName := Params.DefaultIndexName //TODO, get name from request
indexID, _, err := t.core.IDAllocator(1)
log.Debug("MasterService CreateIndexReqTask", zap.Any("indexID", indexID), zap.Error(err))
log.Debug("RootCoord CreateIndexReqTask", zap.Any("indexID", indexID), zap.Error(err))
if err != nil {
return err
}
@ -779,7 +759,7 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
IndexParams: t.Req.ExtraParams,
}
segIDs, field, err := t.core.MetaTable.GetNotIndexedSegments(t.Req.CollectionName, t.Req.FieldName, idxInfo)
log.Debug("MasterService CreateIndexReqTask metaTable.GetNotIndexedSegments", zap.Error(err))
log.Debug("RootCoord CreateIndexReqTask metaTable.GetNotIndexedSegments", zap.Error(err))
if err != nil {
return err
}
@ -806,7 +786,7 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
}
_, err = t.core.MetaTable.AddIndex(segIdxInfos, "", "")
log.Debug("MasterService CreateIndexReq", zap.Any("segIdxInfos", segIdxInfos), zap.Error(err))
log.Debug("RootCoord CreateIndexReq", zap.Any("segIdxInfos", segIdxInfos), zap.Error(err))
return err
}