mirror of https://github.com/milvus-io/milvus.git
Use a singleton kafka producer (#16739)
Signed-off-by: yun.zhang <yun.zhang@zilliz.com>pull/16757/head
parent
5b2b917987
commit
bb9ccbb7e2
|
@ -2,6 +2,7 @@ package kafka
|
|||
|
||||
import (
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/confluentinc/confluent-kafka-go/kafka"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
|
@ -9,6 +10,9 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var Producer *kafka.Producer
|
||||
var once sync.Once
|
||||
|
||||
type kafkaClient struct {
|
||||
// more configs you can see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
|
||||
basicConfig kafka.ConfigMap
|
||||
|
@ -34,6 +38,21 @@ func cloneKafkaConfig(config kafka.ConfigMap) *kafka.ConfigMap {
|
|||
return &newConfig
|
||||
}
|
||||
|
||||
func (kc *kafkaClient) getKafkaProducer() (*kafka.Producer, error) {
|
||||
var err error
|
||||
once.Do(func() {
|
||||
config := kc.newProducerConfig()
|
||||
Producer, err = kafka.NewProducer(config)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Error("create sync kafka producer failed", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return Producer, nil
|
||||
}
|
||||
|
||||
func (kc *kafkaClient) newProducerConfig() *kafka.ConfigMap {
|
||||
newConf := cloneKafkaConfig(kc.basicConfig)
|
||||
// default max message size 5M
|
||||
|
@ -41,6 +60,7 @@ func (kc *kafkaClient) newProducerConfig() *kafka.ConfigMap {
|
|||
newConf.SetKey("compression.codec", "zstd")
|
||||
newConf.SetKey("go.events.channel.size", 0)
|
||||
newConf.SetKey("go.produce.channel.size", 0)
|
||||
newConf.SetKey("linger.ms", 20)
|
||||
return newConf
|
||||
}
|
||||
|
||||
|
@ -68,10 +88,8 @@ func (kc *kafkaClient) newConsumerConfig(group string, offset mqwrapper.Subscrip
|
|||
}
|
||||
|
||||
func (kc *kafkaClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwrapper.Producer, error) {
|
||||
config := kc.newProducerConfig()
|
||||
pp, err := kafka.NewProducer(config)
|
||||
pp, err := kc.getKafkaProducer()
|
||||
if err != nil {
|
||||
log.Error("kafka create sync producer , error", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"go.uber.org/zap"
|
||||
|
||||
|
@ -24,22 +26,38 @@ func (kp *kafkaProducer) Topic() string {
|
|||
}
|
||||
|
||||
func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) {
|
||||
err := kp.p.Produce(&kafka.Message{
|
||||
TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx},
|
||||
Value: message.Payload,
|
||||
}, kp.deliveryChan)
|
||||
var err error
|
||||
maxAttempt := 3
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// In order to avoid https://github.com/confluentinc/confluent-kafka-go/issues/769,
|
||||
// just retry produce again when getting a nil from delivery chan.
|
||||
for i := 0; i < maxAttempt; i++ {
|
||||
err = kp.p.Produce(&kafka.Message{
|
||||
TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx},
|
||||
Value: message.Payload,
|
||||
}, kp.deliveryChan)
|
||||
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
e := <-kp.deliveryChan
|
||||
if e == nil {
|
||||
errMsg := "produce message arise exception, delivery Chan return a nil value"
|
||||
err = errors.New(errMsg)
|
||||
log.Warn(errMsg, zap.String("topic", kp.topic), zap.ByteString("msg", message.Payload), zap.Int("retries", i))
|
||||
continue
|
||||
}
|
||||
|
||||
m := e.(*kafka.Message)
|
||||
if m.TopicPartition.Error != nil {
|
||||
return nil, m.TopicPartition.Error
|
||||
}
|
||||
|
||||
return &kafkaID{messageID: int64(m.TopicPartition.Offset)}, nil
|
||||
}
|
||||
|
||||
e := <-kp.deliveryChan
|
||||
m := e.(*kafka.Message)
|
||||
if m.TopicPartition.Error != nil {
|
||||
return nil, m.TopicPartition.Error
|
||||
}
|
||||
|
||||
return &kafkaID{messageID: int64(m.TopicPartition.Offset)}, nil
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (kp *kafkaProducer) Close() {
|
||||
|
@ -48,7 +66,6 @@ func (kp *kafkaProducer) Close() {
|
|||
//flush in-flight msg within queue.
|
||||
kp.p.Flush(10000)
|
||||
|
||||
kp.p.Close()
|
||||
close(kp.deliveryChan)
|
||||
|
||||
cost := time.Since(start).Milliseconds()
|
||||
|
|
Loading…
Reference in New Issue