Add proxy service timetick loop to expire allcoations

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/4973/head^2
sunby 2021-03-27 14:01:52 +08:00 committed by yefu.chen
parent 35656ea9ce
commit a78cd3e2a7
2 changed files with 42 additions and 1 deletions

View File

@ -34,6 +34,7 @@ type ParamTable struct {
SegmentInfoChannelName string
DataServiceSubscriptionName string
K2SChannelNames []string
ProxyTimeTickChannelName string
SegmentFlushMetaPath string
Log log.Config
@ -73,6 +74,7 @@ func (p *ParamTable) Init() {
p.initK2SChannelNames()
p.initSegmentFlushMetaPath()
p.initLogCfg()
p.initProxyServiceTimeTickChannelName()
})
}
@ -243,3 +245,11 @@ func (p *ParamTable) initLogCfg() {
p.Log.File.Filename = ""
}
}
func (p *ParamTable) initProxyServiceTimeTickChannelName() {
ch, err := p.Load("msgChannel.chanNamePrefix.proxyServiceTimeTick")
if err != nil {
panic(err)
}
p.ProxyTimeTickChannelName = ch
}

View File

@ -288,9 +288,10 @@ func (s *Server) checkMasterIsHealthy() error {
func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
s.serverLoopWg.Add(2)
s.serverLoopWg.Add(3)
go s.startStatsChannel(s.serverLoopCtx)
go s.startSegmentFlushChannel(s.serverLoopCtx)
go s.startProxyServiceTimeTickLoop(s.serverLoopCtx)
}
func (s *Server) startStatsChannel(ctx context.Context) {
@ -354,6 +355,36 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) {
}
}
func (s *Server) startProxyServiceTimeTickLoop(ctx context.Context) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
flushStream, _ := s.msFactory.NewMsgStream(ctx)
flushStream.AsConsumer([]string{Params.ProxyTimeTickChannelName}, Params.DataServiceSubscriptionName)
flushStream.Start()
defer flushStream.Close()
for {
select {
case <-ctx.Done():
log.Debug("Proxy service timetick loop shut down")
default:
}
msgPack := flushStream.Consume()
s.allocMu.Lock()
for _, msg := range msgPack.Msgs {
if msg.Type() != commonpb.MsgType_TimeTick {
log.Warn("receive unknown msg from proxy service timetick", zap.Stringer("msgType", msg.Type()))
continue
}
tMsg := msg.(*msgstream.TimeTickMsg)
traceCtx := context.TODO()
if err := s.segAllocator.ExpireAllocations(traceCtx, tMsg.Base.Timestamp); err != nil {
log.Error("expire allocations error", zap.Error(err))
}
}
s.allocMu.Unlock()
}
}
func (s *Server) startDDChannel(ctx context.Context) {
defer s.serverLoopWg.Done()
ddStream, _ := s.msFactory.NewMsgStream(ctx)