Fix kafka consumer connection leak (#27224)

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/27361/head
jaime 2023-09-26 10:31:27 +08:00 committed by GitHub
parent 7dd0be1b2c
commit 7119cb29ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 47 additions and 12 deletions

View File

@ -202,7 +202,7 @@ generated-proto: download-milvus-proto build-3rdparty
@(env bash $(PWD)/scripts/generate_proto.sh)
build-cpp: generated-proto
@echo "Building Milvus cpp library ... ${AZURE_OPTION}"
@echo "Building Milvus cpp library ..."
@(env bash $(PWD)/scripts/core_build.sh -t ${mode} -f "$(CUSTOM_THIRDPARTY_PATH)" -n ${disk_index} -y ${use_dynamic_simd} ${AZURE_OPTION} -x ${index_engine})
build-cpp-gpu: generated-proto

View File

@ -35,6 +35,7 @@ func TestMain(m *testing.M) {
broker := mockCluster.BootstrapServers()
Params.Save("kafka.brokerList", broker)
log.Info("start testing kafka broker", zap.String("address", broker))
exitCode := m.Run()
os.Exit(exitCode)

View File

@ -126,16 +126,6 @@ func (kc *Consumer) Chan() <-chan mqwrapper.Message {
for {
select {
case <-kc.closeCh:
log.Info("close consumer ", zap.String("topic", kc.topic), zap.String("groupID", kc.groupID))
start := time.Now()
err := kc.c.Close()
if err != nil {
log.Warn("failed to close ", zap.String("topic", kc.topic), zap.Error(err))
}
cost := time.Since(start).Milliseconds()
if cost > 200 {
log.Warn("close consumer costs too long time", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Int64("time(ms)", cost))
}
if kc.msgChannel != nil {
close(kc.msgChannel)
}
@ -258,9 +248,25 @@ func (kc *Consumer) CheckTopicValid(topic string) error {
return nil
}
func (kc *Consumer) closeInternal() {
log.Info("close consumer ", zap.String("topic", kc.topic), zap.String("groupID", kc.groupID))
start := time.Now()
err := kc.c.Close()
if err != nil {
log.Warn("failed to close ", zap.String("topic", kc.topic), zap.Error(err))
}
cost := time.Since(start).Milliseconds()
if cost > 200 {
log.Warn("close consumer costs too long time", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Int64("time(ms)", cost))
}
}
func (kc *Consumer) Close() {
kc.closeOnce.Do(func() {
close(kc.closeCh)
kc.wg.Wait() // wait worker exist and close the client
// wait work goroutine exit
kc.wg.Wait()
// close the client
kc.closeInternal()
})
}

View File

@ -268,3 +268,31 @@ func TestKafkaConsumer_CheckPreTopicValid(t *testing.T) {
err = consumer.CheckTopicValid(topic)
assert.NoError(t, err)
}
func TestKafkaConsumer_Close(t *testing.T) {
rand.Seed(time.Now().UnixNano())
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
data1 := []int{111, 222, 333}
data2 := []string{"111", "222", "333"}
testKafkaConsumerProduceData(t, topic, data1, data2)
t.Run("close after only get latest msgID", func(t *testing.T) {
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
config := createConfig(groupID)
consumer, err := newKafkaConsumer(config, 16, topic, groupID, mqwrapper.SubscriptionPositionEarliest)
assert.NoError(t, err)
_, err = consumer.GetLatestMsgID()
assert.NoError(t, err)
consumer.Close()
})
t.Run("close after only Chan method is invoked", func(t *testing.T) {
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
config := createConfig(groupID)
consumer, err := newKafkaConsumer(config, 16, topic, groupID, mqwrapper.SubscriptionPositionEarliest)
assert.NoError(t, err)
<-consumer.Chan()
consumer.Close()
})
}