fix: kafka use shared channel to receive produce result (#38534)

issue: #38531
pr: #38532

---------

Signed-off-by: chyezh <chyezh@outlook.com>
pull/38548/head
Zhen Ye 2024-12-18 10:36:49 +08:00 committed by GitHub
parent 70d2b58533
commit 2e2f61538f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 19 additions and 23 deletions

View File

@ -214,8 +214,7 @@ func (kc *kafkaClient) CreateProducer(options common.ProducerOptions) (mqwrapper
metrics.MsgStreamRequestLatency.WithLabelValues(metrics.CreateProducerLabel).Observe(float64(elapsed.Milliseconds()))
metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.SuccessLabel).Inc()
deliveryChan := make(chan kafka.Event, 128)
producer := &kafkaProducer{p: pp, deliveryChan: deliveryChan, topic: options.Topic}
producer := &kafkaProducer{p: pp, stopCh: make(chan struct{}), topic: options.Topic}
return producer, nil
}

View File

@ -18,11 +18,11 @@ import (
)
type kafkaProducer struct {
p *kafka.Producer
topic string
deliveryChan chan kafka.Event
closeOnce sync.Once
isClosed bool
p *kafka.Producer
topic string
closeOnce sync.Once
isClosed bool
stopCh chan struct{}
}
func (kp *kafkaProducer) Topic() string {
@ -44,24 +44,28 @@ func (kp *kafkaProducer) Send(ctx context.Context, message *mqcommon.ProducerMes
header := kafka.Header{Key: key, Value: []byte(value)}
headers = append(headers, header)
}
resultCh := make(chan kafka.Event, 1)
err := kp.p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx},
Value: message.Payload,
Headers: headers,
}, kp.deliveryChan)
}, resultCh)
if err != nil {
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc()
return nil, err
}
e, ok := <-kp.deliveryChan
if !ok {
var m *kafka.Message
select {
case <-kp.stopCh:
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc()
log.Error("kafka produce message fail because of delivery chan is closed", zap.String("topic", kp.topic))
return nil, common.NewIgnorableError(fmt.Errorf("delivery chan of kafka producer is closed"))
log.Error("kafka produce message fail because of kafka producer is closed", zap.String("topic", kp.topic))
return nil, common.NewIgnorableError(fmt.Errorf("kafka producer is closed"))
case e := <-resultCh:
m = e.(*kafka.Message)
}
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc()
return nil, m.TopicPartition.Error
@ -85,8 +89,7 @@ func (kp *kafkaProducer) Close() {
log.Warn("There are still un-flushed outstanding events", zap.Int("event_num", i), zap.String("topic", kp.topic))
}
close(kp.deliveryChan)
close(kp.stopCh)
cost := time.Since(start).Milliseconds()
if cost > 500 {
log.Debug("kafka producer is closed", zap.String("topic", kp.topic), zap.Int64("time cost(ms)", cost))

View File

@ -7,7 +7,6 @@ import (
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/assert"
@ -44,26 +43,21 @@ func TestKafkaProducer_SendSuccess(t *testing.T) {
func TestKafkaProducer_SendFail(t *testing.T) {
kafkaAddress := getKafkaBrokerList()
{
deliveryChan := make(chan kafka.Event, 1)
rand.Seed(time.Now().UnixNano())
topic := fmt.Sprintf("test-topic-%d", rand.Int())
pp, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": kafkaAddress})
assert.NoError(t, err)
producer := &kafkaProducer{p: pp, deliveryChan: deliveryChan, topic: topic}
producer := &kafkaProducer{p: pp, stopCh: make(chan struct{}), topic: topic}
close(producer.stopCh)
msg := &common.ProducerMessage{
Payload: []byte{1},
Properties: map[string]string{},
}
var resultMsg kafka.Event = &kafka.Message{TopicPartition: kafka.TopicPartition{Error: errors.New("error")}}
deliveryChan <- resultMsg
ret, err := producer.Send(context.TODO(), msg)
assert.Nil(t, ret)
assert.Error(t, err)
producer.Close()
}
}