mirror of https://github.com/milvus-io/milvus.git
Close kafka internal consumer&producer properly (#24998)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/25047/head
parent
5bd93f2432
commit
4fda4f3951
|
@ -24,6 +24,7 @@ type Consumer struct {
|
|||
chanOnce sync.Once
|
||||
closeOnce sync.Once
|
||||
closeCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
const timeout = 3000
|
||||
|
@ -118,7 +119,9 @@ func (kc *Consumer) Chan() <-chan mqwrapper.Message {
|
|||
panic("failed to chan a kafka consumer without assign")
|
||||
}
|
||||
kc.chanOnce.Do(func() {
|
||||
kc.wg.Add(1)
|
||||
go func() {
|
||||
defer kc.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-kc.closeCh:
|
||||
|
@ -248,5 +251,7 @@ func (kc *Consumer) CheckTopicValid(topic string) error {
|
|||
func (kc *Consumer) Close() {
|
||||
kc.closeOnce.Do(func() {
|
||||
close(kc.closeCh)
|
||||
kc.wg.Wait()
|
||||
kc.c.Close()
|
||||
})
|
||||
}
|
||||
|
|
|
@ -67,5 +67,7 @@ func (kp *kafkaProducer) Close() {
|
|||
if cost > 500 {
|
||||
log.Debug("kafka producer is closed", zap.Any("topic", kp.topic), zap.Int64("time cost(ms)", cost))
|
||||
}
|
||||
|
||||
kp.p.Close()
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue