add user special kafka config (#18742)

Signed-off-by: wgcn <1026688210@qq.com>

Signed-off-by: wgcn <1026688210@qq.com>
Co-authored-by: wanggang11335 <wanggang11335@autohome.com.cn>
pull/18823/head
wgcn 2022-08-25 11:02:53 +08:00 committed by GitHub
parent f3c755602b
commit d443b5420a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 132 additions and 16 deletions

View File

@ -96,7 +96,11 @@ pulsar:
maxMessageSize: 5242880 # 5 * 1024 * 1024 Bytes, Maximum size of each message in pulsar.
# If you want to enable kafka, needs to comment the pulsar configs
#kafka:
kafka:
producer:
client.id: dc
consumer:
client.id: dc1
# brokerList: localhost1:9092,localhost2:9092,localhost3:9092
# saslUsername: username
# saslPassword: password

View File

@ -64,7 +64,9 @@ func (m *Manager) GetConfig(key string) (string, error) {
}
//GetConfigsByPattern returns key values that matched pattern
func (m *Manager) GetConfigsByPattern(pattern string) map[string]string {
// withPrefix : whether key include the prefix of pattern
func (m *Manager) GetConfigsByPattern(pattern string, withPrefix bool) map[string]string {
m.RLock()
defer m.RUnlock()
matchedConfig := make(map[string]string)
@ -77,7 +79,16 @@ func (m *Manager) GetConfigsByPattern(pattern string) map[string]string {
if err != nil {
continue
}
matchedConfig[key] = sValue
checkAndCutOffKey := func() string {
if withPrefix {
return key
}
return strings.Replace(key, pattern, "", 1)
}
finalKey := checkAndCutOffKey()
matchedConfig[finalKey] = sValue
}
}
return matchedConfig

View File

@ -445,7 +445,7 @@ func getKafkaInputStream(ctx context.Context, kafkaAddress string, producerChann
"api.version.request": true,
"linger.ms": 10,
}
kafkaClient := kafkawrapper.NewKafkaClientInstanceWithConfigMap(config)
kafkaClient := kafkawrapper.NewKafkaClientInstanceWithConfigMap(config, nil, nil)
inputStream, _ := NewMqMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
for _, opt := range opts {

View File

@ -1,6 +1,7 @@
package kafka
import (
"fmt"
"strconv"
"sync"
@ -17,7 +18,9 @@ var once sync.Once
type kafkaClient struct {
// more configs you can see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
basicConfig kafka.ConfigMap
basicConfig kafka.ConfigMap
consumerConfig kafka.ConfigMap
producerConfig kafka.ConfigMap
}
func getBasicConfig(address string) kafka.ConfigMap {
@ -31,11 +34,16 @@ func getBasicConfig(address string) kafka.ConfigMap {
func NewKafkaClientInstance(address string) *kafkaClient {
config := getBasicConfig(address)
return &kafkaClient{basicConfig: config}
return NewKafkaClientInstanceWithConfigMap(config, kafka.ConfigMap{}, kafka.ConfigMap{})
}
func NewKafkaClientInstanceWithConfigMap(config kafka.ConfigMap) *kafkaClient {
return &kafkaClient{basicConfig: config}
func NewKafkaClientInstanceWithConfigMap(config kafka.ConfigMap, extraConsumerConfig kafka.ConfigMap, extraProducerConfig kafka.ConfigMap) *kafkaClient {
log.Info("init kafka Config ", zap.String("commonConfig", fmt.Sprintf("+%v", config)),
zap.String("extraConsumerConfig", fmt.Sprintf("+%v", extraConsumerConfig)),
zap.String("extraProducerConfig", fmt.Sprintf("+%v", extraProducerConfig)),
)
return &kafkaClient{basicConfig: config, consumerConfig: extraConsumerConfig, producerConfig: extraProducerConfig}
}
func NewKafkaClientInstanceWithConfig(config *paramtable.KafkaConfig) *kafkaClient {
@ -53,7 +61,16 @@ func NewKafkaClientInstanceWithConfig(config *paramtable.KafkaConfig) *kafkaClie
kafkaConfig.SetKey("sasl.password", config.SaslPassword)
}
return &kafkaClient{basicConfig: kafkaConfig}
specExtraConfig := func(config map[string]string) kafka.ConfigMap {
kafkaConfigMap := make(kafka.ConfigMap, len(config))
for k, v := range config {
kafkaConfigMap.SetKey(k, v)
}
return kafkaConfigMap
}
return NewKafkaClientInstanceWithConfigMap(kafkaConfig, specExtraConfig(config.ConsumerExtraConfig), specExtraConfig(config.ProducerExtraConfig))
}
func cloneKafkaConfig(config kafka.ConfigMap) *kafka.ConfigMap {
@ -103,6 +120,10 @@ func (kc *kafkaClient) newProducerConfig() *kafka.ConfigMap {
newConf.SetKey("message.max.bytes", 10485760)
newConf.SetKey("compression.codec", "zstd")
newConf.SetKey("linger.ms", 20)
//special producer config
kc.specialExtraConfig(newConf, kc.producerConfig)
return newConf
}
@ -126,6 +147,9 @@ func (kc *kafkaClient) newConsumerConfig(group string, offset mqwrapper.Subscrip
//newConf.SetKey("enable.partition.eof", true)
newConf.SetKey("go.events.channel.enable", true)
kc.specialExtraConfig(newConf, kc.consumerConfig)
return newConf
}
@ -159,6 +183,16 @@ func (kc *kafkaClient) StringToMsgID(id string) (mqwrapper.MessageID, error) {
return &kafkaID{messageID: offset}, nil
}
func (kc *kafkaClient) specialExtraConfig(current *kafka.ConfigMap, special kafka.ConfigMap) {
for k, v := range special {
if existingConf, _ := current.Get(k, nil); existingConf != nil {
log.Warn(fmt.Sprintf("The existing config : %v=%v will be covered by the speciled kafka config : %v.", k, v, existingConf))
}
current.SetKey(k, v)
}
}
func (kc *kafkaClient) BytesToMsgID(id []byte) (mqwrapper.MessageID, error) {
offset := DeserializeKafkaID(id)
return &kafkaID{messageID: offset}, nil

View File

@ -308,6 +308,29 @@ func TestKafkaClient_NewKafkaClientInstanceWithConfig(t *testing.T) {
client := NewKafkaClientInstanceWithConfig(config3)
assert.NotNil(t, client)
assert.NotNil(t, client.basicConfig)
consumerConfig := make(map[string]string)
consumerConfig["client.id"] = "dc"
config4 := &paramtable.KafkaConfig{Address: "addr", SaslUsername: "username", SaslPassword: "password", ConsumerExtraConfig: consumerConfig}
client4 := NewKafkaClientInstanceWithConfig(config4)
assert.Equal(t, "dc", client4.consumerConfig["client.id"])
newConsumerConfig := client4.newConsumerConfig("test", 0)
clientID, err := newConsumerConfig.Get("client.id", "")
assert.Nil(t, err)
assert.Equal(t, "dc", clientID)
producerConfig := make(map[string]string)
producerConfig["client.id"] = "dc1"
config5 := &paramtable.KafkaConfig{Address: "addr", SaslUsername: "username", SaslPassword: "password", ProducerExtraConfig: producerConfig}
client5 := NewKafkaClientInstanceWithConfig(config5)
assert.Equal(t, "dc1", client5.producerConfig["client.id"])
newProducerConfig := client5.newProducerConfig()
pClientID, err := newProducerConfig.Get("client.id", "")
assert.Nil(t, err)
assert.Equal(t, pClientID, "dc1")
}
func createKafkaClient(t *testing.T) *kafkaClient {

View File

@ -212,7 +212,11 @@ func (gp *BaseTable) Get(key string) string {
}
func (gp *BaseTable) GetByPattern(pattern string) map[string]string {
return gp.mgr.GetConfigsByPattern(pattern)
return gp.mgr.GetConfigsByPattern(pattern, true)
}
func (gp *BaseTable) GetConfigSubSet(pattern string) map[string]string {
return gp.mgr.GetConfigsByPattern(pattern, false)
}
// For compatible reason, only visiable for Test

View File

@ -13,6 +13,7 @@ package paramtable
import (
"os"
"strings"
"testing"
"github.com/stretchr/testify/assert"
@ -27,6 +28,25 @@ func TestMain(m *testing.M) {
os.Exit(code)
}
func TestBaseTable_GetConfigSubSet(t *testing.T) {
prefix := "rootcoord."
configs := baseParams.mgr.Configs()
configsWithPrefix := make(map[string]string)
for k, v := range configs {
if strings.HasPrefix(k, prefix) {
configsWithPrefix[k] = v
}
}
subSet := baseParams.GetConfigSubSet(prefix)
for k := range configs {
assert.Equal(t, subSet[k], configs[prefix+k])
}
assert.Equal(t, len(subSet), len(configsWithPrefix))
}
func TestBaseTable_SaveAndLoad(t *testing.T) {
err1 := baseParams.Save("int", "10")
assert.Nil(t, err1)

View File

@ -30,6 +30,16 @@ func TestComponentParam(t *testing.T) {
var CParams ComponentParam
CParams.Init()
t.Run("test kafkaConfig", func(t *testing.T) {
params := CParams.ServiceParam.KafkaCfg
producerConfig := params.ProducerExtraConfig
assert.Equal(t, "dc", producerConfig["client.id"])
consumerConfig := params.ConsumerExtraConfig
assert.Equal(t, "dc1", consumerConfig["client.id"])
})
t.Run("test commonConfig", func(t *testing.T) {
Params := CParams.CommonCfg

View File

@ -38,6 +38,8 @@ const (
SuggestPulsarMaxMessageSize = 5 * 1024 * 1024
defaultEtcdLogLevel = "info"
defaultEtcdLogPath = "stdout"
KafkaProducerConfigPrefix = "kafka.producer."
KafkaConsumerConfigPrefix = "kafka.consumer."
)
// ServiceParam is used to quickly and easily access all basic service configurations.
@ -365,12 +367,14 @@ func (p *PulsarConfig) initMaxMessageSize() {
// --- kafka ---
type KafkaConfig struct {
Base *BaseTable
Address string
SaslUsername string
SaslPassword string
SaslMechanisms string
SecurityProtocol string
Base *BaseTable
Address string
SaslUsername string
SaslPassword string
SaslMechanisms string
SecurityProtocol string
ConsumerExtraConfig map[string]string
ProducerExtraConfig map[string]string
}
func (k *KafkaConfig) init(base *BaseTable) {
@ -380,6 +384,7 @@ func (k *KafkaConfig) init(base *BaseTable) {
k.initSaslPassword()
k.initSaslMechanisms()
k.initSecurityProtocol()
k.initExtraKafkaConfig()
}
func (k *KafkaConfig) initAddress() {
@ -402,6 +407,11 @@ func (k *KafkaConfig) initSecurityProtocol() {
k.SecurityProtocol = k.Base.LoadWithDefault("kafka.securityProtocol", "SASL_SSL")
}
func (k *KafkaConfig) initExtraKafkaConfig() {
k.ConsumerExtraConfig = k.Base.GetConfigSubSet(KafkaConsumerConfigPrefix)
k.ProducerExtraConfig = k.Base.GetConfigSubSet(KafkaProducerConfigPrefix)
}
///////////////////////////////////////////////////////////////////////////////
// --- rocksmq ---
type RocksmqConfig struct {