Add loop to update tsoallocator and id allocator in master service

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/4973/head^2
sunby 2021-01-27 16:38:18 +08:00 committed by yefu.chen
parent 68cd15af63
commit f1afd5d311
1 changed files with 24 additions and 0 deletions

View File

@ -375,6 +375,29 @@ func (c *Core) startSegmentFlushCompletedLoop() {
}
}
func (c *Core) tsLoop() {
tsoTicker := time.NewTicker(UpdateTimestampStep)
defer tsoTicker.Stop()
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
for {
select {
case <-tsoTicker.C:
if err := c.tsoAllocator.UpdateTSO(); err != nil {
log.Println("failed to update timestamp", err)
return
}
if err := c.idAllocator.UpdateID(); err != nil {
log.Println("failed to update id", err)
return
}
case <-ctx.Done():
// Server is closed and it should return nil.
log.Println("tsLoop is closed")
return
}
}
}
func (c *Core) setMsgStreams() error {
if Params.PulsarAddress == "" {
return errors.Errorf("PulsarAddress is empty")
@ -704,6 +727,7 @@ func (c *Core) Start() error {
go c.startDataServiceSegmentLoop()
go c.startCreateIndexLoop()
go c.startSegmentFlushCompletedLoop()
go c.tsLoop()
c.stateCode.Store(internalpb2.StateCode_HEALTHY)
})
log.Printf("Master service State Code = %s", internalpb2.StateCode_name[int32(internalpb2.StateCode_HEALTHY)])