fix: Clean kafka default configuration (#30924)

issue: #30917

Signed-off-by: yhmo <yihua.mo@zilliz.com>
pull/30989/head
groot 2024-03-01 18:17:03 +08:00 committed by GitHub
parent 7783098ddd
commit 85de56e894
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 11 additions and 11 deletions

View File

@ -123,8 +123,8 @@ pulsar:
# brokerList: # brokerList:
# saslUsername: # saslUsername:
# saslPassword: # saslPassword:
# saslMechanisms: PLAIN # saslMechanisms:
# securityProtocol: SASL_SSL # securityProtocol:
# readTimeout: 10 # read message timeout in seconds # readTimeout: 10 # read message timeout in seconds
# ssl: # ssl:
# enabled: false # Whether to support kafka secure connection mode # enabled: false # Whether to support kafka secure connection mode

View File

@ -722,7 +722,7 @@ func (k *KafkaConfig) Init(base *BaseTable) {
k.SaslMechanisms = ParamItem{ k.SaslMechanisms = ParamItem{
Key: "kafka.saslMechanisms", Key: "kafka.saslMechanisms",
DefaultValue: "PLAIN", DefaultValue: "",
Version: "2.1.0", Version: "2.1.0",
Export: true, Export: true,
} }
@ -730,7 +730,7 @@ func (k *KafkaConfig) Init(base *BaseTable) {
k.SecurityProtocol = ParamItem{ k.SecurityProtocol = ParamItem{
Key: "kafka.securityProtocol", Key: "kafka.securityProtocol",
DefaultValue: "SASL_SSL", DefaultValue: "",
Version: "2.1.0", Version: "2.1.0",
Export: true, Export: true,
} }
@ -739,7 +739,7 @@ func (k *KafkaConfig) Init(base *BaseTable) {
k.KafkaUseSSL = ParamItem{ k.KafkaUseSSL = ParamItem{
Key: "kafka.ssl.enabled", Key: "kafka.ssl.enabled",
DefaultValue: "false", DefaultValue: "false",
Version: "2.3.8", Version: "2.3.11",
Doc: "whether to enable ssl mode", Doc: "whether to enable ssl mode",
Export: true, Export: true,
} }
@ -747,7 +747,7 @@ func (k *KafkaConfig) Init(base *BaseTable) {
k.KafkaTLSCert = ParamItem{ k.KafkaTLSCert = ParamItem{
Key: "kafka.ssl.tlsCert", Key: "kafka.ssl.tlsCert",
Version: "2.3.8", Version: "2.3.11",
Doc: "path to client's public key (PEM) used for authentication", Doc: "path to client's public key (PEM) used for authentication",
Export: true, Export: true,
} }
@ -755,7 +755,7 @@ func (k *KafkaConfig) Init(base *BaseTable) {
k.KafkaTLSKey = ParamItem{ k.KafkaTLSKey = ParamItem{
Key: "kafka.ssl.tlsKey", Key: "kafka.ssl.tlsKey",
Version: "2.3.8", Version: "2.3.11",
Doc: "path to client's private key (PEM) used for authentication", Doc: "path to client's private key (PEM) used for authentication",
Export: true, Export: true,
} }
@ -763,7 +763,7 @@ func (k *KafkaConfig) Init(base *BaseTable) {
k.KafkaTLSCACert = ParamItem{ k.KafkaTLSCACert = ParamItem{
Key: "kafka.ssl.tlsCaCert", Key: "kafka.ssl.tlsCaCert",
Version: "2.3.8", Version: "2.3.11",
Doc: "file or directory path to CA certificate(s) for verifying the broker's key", Doc: "file or directory path to CA certificate(s) for verifying the broker's key",
Export: true, Export: true,
} }
@ -771,7 +771,7 @@ func (k *KafkaConfig) Init(base *BaseTable) {
k.KafkaTLSKeyPassword = ParamItem{ k.KafkaTLSKeyPassword = ParamItem{
Key: "kafka.ssl.tlsKeyPassword", Key: "kafka.ssl.tlsKeyPassword",
Version: "2.3.8", Version: "2.3.11",
Doc: "private key passphrase for use with ssl.key.location and set_ssl_cert(), if any", Doc: "private key passphrase for use with ssl.key.location and set_ssl_cert(), if any",
Export: true, Export: true,
} }

View File

@ -166,8 +166,8 @@ func TestServiceParam(t *testing.T) {
base := &BaseTable{mgr: config.NewManager()} base := &BaseTable{mgr: config.NewManager()}
kc.Init(base) kc.Init(base)
assert.Empty(t, kc.Address.GetValue()) assert.Empty(t, kc.Address.GetValue())
assert.Equal(t, kc.SaslMechanisms.GetValue(), "PLAIN") assert.Empty(t, kc.SaslMechanisms.GetValue())
assert.Equal(t, kc.SecurityProtocol.GetValue(), "SASL_SSL") assert.Empty(t, kc.SecurityProtocol.GetValue())
assert.Equal(t, kc.ReadTimeout.GetAsDuration(time.Second), 10*time.Second) assert.Equal(t, kc.ReadTimeout.GetAsDuration(time.Second), 10*time.Second)
assert.Equal(t, kc.KafkaUseSSL.GetAsBool(), false) assert.Equal(t, kc.KafkaUseSSL.GetAsBool(), false)
assert.Empty(t, kc.KafkaTLSCACert.GetValue()) assert.Empty(t, kc.KafkaTLSCACert.GetValue())