Fix memory leak casued by unclosed channel in msgstream (#16051)

issue: #16045
Signed-off-by: sunby <bingyi.sun@zilliz.com>

Co-authored-by: sunby <bingyi.sun@zilliz.com>
pull/15155/head
Bingyi Sun 2022-03-15 15:55:21 +08:00 committed by GitHub
parent faae6732e5
commit d3d5a41b76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 8 additions and 14 deletions

View File

@ -22,6 +22,7 @@ import (
"fmt"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/golang/protobuf/proto"
@ -55,6 +56,7 @@ type mqMsgStream struct {
producerLock *sync.Mutex
consumerLock *sync.Mutex
readerLock *sync.Mutex
closed int32
}
// NewMqMsgStream is used to generate a new mqMsgStream object
@ -87,6 +89,7 @@ func NewMqMsgStream(ctx context.Context,
consumerLock: &sync.Mutex{},
readerLock: &sync.Mutex{},
wait: &sync.WaitGroup{},
closed: 0,
}
return stream, nil
@ -183,6 +186,9 @@ func (ms *mqMsgStream) Start() {
}
func (ms *mqMsgStream) Close() {
if !atomic.CompareAndSwapInt32(&ms.closed, 0, 1) {
return
}
ms.streamCancel()
ms.wait.Wait()
@ -198,6 +204,7 @@ func (ms *mqMsgStream) Close() {
}
ms.client.Close()
close(ms.receiveBuf)
}
func (ms *mqMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 {
@ -649,21 +656,8 @@ func (ms *MqTtMsgStream) Start() {
// Close will stop goroutine and free internal producers and consumers
func (ms *MqTtMsgStream) Close() {
ms.streamCancel()
close(ms.syncConsumer)
ms.wait.Wait()
for _, producer := range ms.producers {
if producer != nil {
producer.Close()
}
}
for _, consumer := range ms.consumers {
if consumer != nil {
consumer.Close()
}
}
ms.client.Close()
ms.mqMsgStream.Close()
}
func (ms *MqTtMsgStream) bufMsgPackToChannel() {