mirror of https://github.com/milvus-io/milvus.git
Fix datanode ttNode goroutine leak (#27878)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/27825/head
parent
4faba61647
commit
b9d5ef3599
|
@ -52,6 +52,8 @@ type ttNode struct {
|
|||
updateCPLock sync.Mutex
|
||||
notifyChannel chan checkPoint
|
||||
closeChannel chan struct{}
|
||||
closeOnce sync.Once
|
||||
closeWg sync.WaitGroup
|
||||
}
|
||||
|
||||
type checkPoint struct {
|
||||
|
@ -76,13 +78,19 @@ func (ttn *ttNode) IsValidInMsg(in []Msg) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (ttn *ttNode) Close() {
|
||||
ttn.closeOnce.Do(func() {
|
||||
close(ttn.closeChannel)
|
||||
ttn.closeWg.Wait()
|
||||
})
|
||||
}
|
||||
|
||||
// Operate handles input messages, implementing flowgraph.Node
|
||||
func (ttn *ttNode) Operate(in []Msg) []Msg {
|
||||
fgMsg := in[0].(*flowGraphMsg)
|
||||
curTs, _ := tsoutil.ParseTS(fgMsg.timeRange.timestampMax)
|
||||
if fgMsg.IsCloseMsg() {
|
||||
if len(fgMsg.endPositions) > 0 {
|
||||
close(ttn.closeChannel)
|
||||
channelPos := ttn.channel.getChannelCheckpoint(fgMsg.endPositions[0])
|
||||
log.Info("flowgraph is closing, force update channel CP",
|
||||
zap.Time("cpTs", tsoutil.PhysicalTime(channelPos.GetTimestamp())),
|
||||
|
@ -151,13 +159,17 @@ func newTTNode(config *nodeConfig, broker broker.Broker) (*ttNode, error) {
|
|||
broker: broker,
|
||||
notifyChannel: make(chan checkPoint, 1),
|
||||
closeChannel: make(chan struct{}),
|
||||
closeWg: sync.WaitGroup{},
|
||||
}
|
||||
|
||||
// check point updater
|
||||
tt.closeWg.Add(1)
|
||||
go func() {
|
||||
defer tt.closeWg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-tt.closeChannel:
|
||||
log.Info("ttNode updater exited", zap.String("channel", tt.vChannelName))
|
||||
return
|
||||
case cp := <-tt.notifyChannel:
|
||||
tt.updateChannelCP(cp.pos, cp.curTs)
|
||||
|
|
Loading…
Reference in New Issue