Fix pulsar reader not close (#14543)

Signed-off-by: godchen0212 <qingxiang.chen@zilliz.com>
pull/14773/head
godchen 2022-01-04 14:45:19 +08:00 committed by GitHub
parent d4ac010165
commit 9e51591b3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 39 additions and 11 deletions

View File

@ -226,6 +226,13 @@ func (ms *mqMsgStream) Close() {
consumer.Close() consumer.Close()
} }
} }
for _, reader := range ms.readers {
if reader != nil {
reader.Close()
}
}
ms.client.Close()
} }
func (ms *mqMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 { func (ms *mqMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 {

View File

@ -462,10 +462,14 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
if err != nil { if err != nil {
return err return err
} }
defer stream.Close()
pChannelName := rootcoord.ToPhysicalChannel(position.ChannelName) pChannelName := rootcoord.ToPhysicalChannel(position.ChannelName)
position.ChannelName = pChannelName position.ChannelName = pChannelName
stream.AsReader([]string{pChannelName}, fmt.Sprintf("querynode-%d-%d", Params.QueryNodeCfg.QueryNodeID, collectionID)) stream.AsReader([]string{pChannelName}, fmt.Sprintf("querynode-%d-%d", Params.QueryNodeCfg.QueryNodeID, collectionID))
stream.SeekReaders([]*internalpb.MsgPosition{position}) err = stream.SeekReaders([]*internalpb.MsgPosition{position})
if err != nil {
return err
}
delData := &deleteData{ delData := &deleteData{
deleteIDs: make(map[UniqueID][]int64), deleteIDs: make(map[UniqueID][]int64),
@ -518,7 +522,6 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
go deletePk(loader.historicalReplica, delData, segmentID, &wg) go deletePk(loader.historicalReplica, delData, segmentID, &wg)
} }
wg.Wait() wg.Wait()
stream.Close()
log.Debug("from dml check point load done", zap.Any("msg id", position.GetMsgID())) log.Debug("from dml check point load done", zap.Any("msg id", position.GetMsgID()))
return nil return nil
} }

View File

@ -625,7 +625,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
for _, info := range w.req.Infos { for _, info := range w.req.Infos {
if err := w.node.loader.FromDmlCPLoadDelete(w.ctx, collectionID, info.SeekPosition); err != nil { if err := w.node.loader.FromDmlCPLoadDelete(w.ctx, collectionID, info.SeekPosition); err != nil {
return errors.New("watchDeltaChannelsTask failed, error = " + err.Error()) return errors.New("watchDeltaChannelsTask from dml cp load delete failed, error = " + err.Error())
} }
} }

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"testing" "testing"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/msgstream"
@ -452,8 +453,8 @@ func TestTask_watchDeltaChannelsTask(t *testing.T) {
CollectionID: defaultCollectionID, CollectionID: defaultCollectionID,
ChannelName: defaultDeltaChannel, ChannelName: defaultDeltaChannel,
SeekPosition: &internalpb.MsgPosition{ SeekPosition: &internalpb.MsgPosition{
ChannelName: defaultDeltaChannel, ChannelName: defaultDMLChannel,
MsgID: []byte{1, 2, 3}, MsgID: pulsar.EarliestMessageID().Serialize(),
MsgGroup: defaultSubName, MsgGroup: defaultSubName,
Timestamp: 0, Timestamp: 0,
}, },

View File

@ -110,6 +110,5 @@ func (rc *rmqClient) BytesToMsgID(id []byte) (MessageID, error) {
} }
func (rc *rmqClient) Close() { func (rc *rmqClient) Close() {
// TODO(yukun): What to do here? rc.client.Close()
// rc.client.Close()
} }

View File

@ -25,6 +25,7 @@ type RmqConsumer struct {
closeCh chan struct{} closeCh chan struct{}
once sync.Once once sync.Once
skip int32 skip int32
wg sync.WaitGroup
} }
// Subscription returns the subscription name of this consumer // Subscription returns the subscription name of this consumer
@ -37,7 +38,9 @@ func (rc *RmqConsumer) Chan() <-chan Message {
if rc.msgChannel == nil { if rc.msgChannel == nil {
rc.once.Do(func() { rc.once.Do(func() {
rc.msgChannel = make(chan Message, 256) rc.msgChannel = make(chan Message, 256)
rc.wg.Add(1)
go func() { go func() {
defer rc.wg.Done()
for { //nolint:gosimple for { //nolint:gosimple
select { select {
case msg, ok := <-rc.c.Chan(): case msg, ok := <-rc.c.Chan():
@ -78,6 +81,6 @@ func (rc *RmqConsumer) Ack(message Message) {
// Close is used to free the resources of this consumer // Close is used to free the resources of this consumer
func (rc *RmqConsumer) Close() { func (rc *RmqConsumer) Close() {
rc.c.Close()
close(rc.closeCh) close(rc.closeCh)
rc.wg.Wait()
} }

View File

@ -38,5 +38,6 @@ func (rp *rmqProducer) Send(ctx context.Context, message *ProducerMessage) (Mess
// Close does nothing currently // Close does nothing currently
func (rp *rmqProducer) Close() { func (rp *rmqProducer) Close() {
// TODO(yukun): may need to destroy topic //TODO: close producer. Now it has bug
//rp.p.Close()
} }

View File

@ -182,7 +182,6 @@ func (c *client) CreateReader(readerOptions ReaderOptions) (Reader, error) {
// Close close the channel to notify rocksmq to stop operation and close rocksmq server // Close close the channel to notify rocksmq to stop operation and close rocksmq server
func (c *client) Close() { func (c *client) Close() {
// TODO(yukun): Should call server.close() here?
c.closeOnce.Do(func() { c.closeOnce.Do(func() {
close(c.closeCh) close(c.closeCh)
c.wg.Wait() c.wg.Wait()

View File

@ -1002,6 +1002,20 @@ func (rmq *rocksmq) getReader(topicName, readerName string) *rocksmqReader {
return nil return nil
} }
func (rmq *rocksmq) getAndDeleteReader(topicName, readerName string) *rocksmqReader {
if vals, ok := rmq.readers.Load(topicName); ok {
readers := vals.([]*rocksmqReader)
for i, v := range vals.([]*rocksmqReader) {
if v.readerName == readerName {
readers[i] = readers[len(readers)-1]
rmq.readers.Store(topicName, readers[:len(readers)-1])
return v
}
}
}
return nil
}
// ReaderSeek seek a reader to the pointed position // ReaderSeek seek a reader to the pointed position
func (rmq *rocksmq) ReaderSeek(topicName string, readerName string, msgID UniqueID) error { func (rmq *rocksmq) ReaderSeek(topicName string, readerName string, msgID UniqueID) error {
if rmq.isClosed() { if rmq.isClosed() {
@ -1046,10 +1060,11 @@ func (rmq *rocksmq) CloseReader(topicName string, readerName string) {
if rmq.isClosed() { if rmq.isClosed() {
return return
} }
reader := rmq.getReader(topicName, readerName) reader := rmq.getAndDeleteReader(topicName, readerName)
if reader == nil { if reader == nil {
log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName)) log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName))
return return
} }
reader.Close() reader.Close()
reader = nil
} }