mirror of https://github.com/milvus-io/milvus.git
Fix kafka producer init many times (#26314)
Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>pull/26319/head
parent
83910593ca
commit
a3c176045d
|
@ -6,16 +6,21 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/confluentinc/confluent-kafka-go/kafka"
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
||||||
|
"go.uber.org/atomic"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/metrics"
|
"github.com/milvus-io/milvus/pkg/metrics"
|
||||||
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
|
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||||
)
|
)
|
||||||
|
|
||||||
var producer *kafka.Producer
|
var (
|
||||||
|
producer atomic.Pointer[kafka.Producer]
|
||||||
|
sf conc.Singleflight[*kafka.Producer]
|
||||||
|
)
|
||||||
|
|
||||||
var once sync.Once
|
var once sync.Once
|
||||||
|
|
||||||
|
@ -85,15 +90,21 @@ func cloneKafkaConfig(config kafka.ConfigMap) *kafka.ConfigMap {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kc *kafkaClient) getKafkaProducer() (*kafka.Producer, error) {
|
func (kc *kafkaClient) getKafkaProducer() (*kafka.Producer, error) {
|
||||||
|
if p := producer.Load(); p != nil {
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
p, err, _ := sf.Do("kafka_producer", func() (*kafka.Producer, error) {
|
||||||
|
if p := producer.Load(); p != nil {
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
config := kc.newProducerConfig()
|
config := kc.newProducerConfig()
|
||||||
producer, err := kafka.NewProducer(config)
|
p, err := kafka.NewProducer(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("create sync kafka producer failed", zap.Error(err))
|
log.Error("create sync kafka producer failed", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
once.Do(func() {
|
|
||||||
go func() {
|
go func() {
|
||||||
for e := range producer.Events() {
|
for e := range p.Events() {
|
||||||
switch ev := e.(type) {
|
switch ev := e.(type) {
|
||||||
case kafka.Error:
|
case kafka.Error:
|
||||||
// Generic client instance-level errors, such as broker connection failures,
|
// Generic client instance-level errors, such as broker connection failures,
|
||||||
|
@ -109,9 +120,13 @@ func (kc *kafkaClient) getKafkaProducer() (*kafka.Producer, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
producer.Store(p)
|
||||||
|
return p, nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
return producer, nil
|
return nil, err
|
||||||
|
}
|
||||||
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kc *kafkaClient) newProducerConfig() *kafka.ConfigMap {
|
func (kc *kafkaClient) newProducerConfig() *kafka.ConfigMap {
|
||||||
|
|
Loading…
Reference in New Issue