diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go index 1a83bb4663..f2bdb4424d 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go @@ -251,7 +251,6 @@ func (kc *Consumer) CheckTopicValid(topic string) error { func (kc *Consumer) Close() { kc.closeOnce.Do(func() { close(kc.closeCh) - kc.wg.Wait() - kc.c.Close() + kc.wg.Wait() // wait worker exist and close the client }) }