fix: Support TLS for kafka connection (#30468)

#27977

Add extra configurations in milvus.yaml to pass certificates for kafka.

Signed-off-by: yhmo <yihua.mo@zilliz.com>
pull/30372/head
groot 2024-02-28 18:43:07 +08:00 committed by GitHub
parent e2f35954d4
commit ba6d33cd57
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 70 additions and 1 deletions

View File

@ -126,6 +126,12 @@ pulsar:
# saslMechanisms: PLAIN # saslMechanisms: PLAIN
# securityProtocol: SASL_SSL # securityProtocol: SASL_SSL
# readTimeout: 10 # read message timeout in seconds # readTimeout: 10 # read message timeout in seconds
# ssl:
# enabled: false # Whether to support kafka secure connection mode
# tlsCert: /path/to/client.pem # path to client's public key
# tlsKey: /path/to/client.key # path to client's private key
# tlsCACert: /path/to/ca-cert # file or directory path to CA certificate
# tlsKeyPassword: "" # private key passphrase for use with private key, if any
rocksmq: rocksmq:
# The path where the message is stored in rocksmq # The path where the message is stored in rocksmq

View File

@ -73,13 +73,25 @@ func NewKafkaClientInstanceWithConfig(ctx context.Context, config *paramtable.Ka
panic("enable security mode need config username and password at the same time!") panic("enable security mode need config username and password at the same time!")
} }
if config.SecurityProtocol.GetValue() != "" {
kafkaConfig.SetKey("security.protocol", config.SecurityProtocol.GetValue())
}
if config.SaslUsername.GetValue() != "" && config.SaslPassword.GetValue() != "" { if config.SaslUsername.GetValue() != "" && config.SaslPassword.GetValue() != "" {
kafkaConfig.SetKey("sasl.mechanisms", config.SaslMechanisms.GetValue()) kafkaConfig.SetKey("sasl.mechanisms", config.SaslMechanisms.GetValue())
kafkaConfig.SetKey("security.protocol", config.SecurityProtocol.GetValue())
kafkaConfig.SetKey("sasl.username", config.SaslUsername.GetValue()) kafkaConfig.SetKey("sasl.username", config.SaslUsername.GetValue())
kafkaConfig.SetKey("sasl.password", config.SaslPassword.GetValue()) kafkaConfig.SetKey("sasl.password", config.SaslPassword.GetValue())
} }
if config.KafkaUseSSL.GetAsBool() {
kafkaConfig.SetKey("ssl.certificate.location", config.KafkaTLSCert.GetValue())
kafkaConfig.SetKey("ssl.key.location", config.KafkaTLSKey.GetValue())
kafkaConfig.SetKey("ssl.ca.location", config.KafkaTLSCACert.GetValue())
if config.KafkaTLSKeyPassword.GetValue() != "" {
kafkaConfig.SetKey("ssl.key.password", config.KafkaTLSKeyPassword.GetValue())
}
}
specExtraConfig := func(config map[string]string) kafka.ConfigMap { specExtraConfig := func(config map[string]string) kafka.ConfigMap {
kafkaConfigMap := make(kafka.ConfigMap, len(config)) kafkaConfigMap := make(kafka.ConfigMap, len(config))
for k, v := range config { for k, v := range config {

View File

@ -684,6 +684,11 @@ type KafkaConfig struct {
SaslPassword ParamItem `refreshable:"false"` SaslPassword ParamItem `refreshable:"false"`
SaslMechanisms ParamItem `refreshable:"false"` SaslMechanisms ParamItem `refreshable:"false"`
SecurityProtocol ParamItem `refreshable:"false"` SecurityProtocol ParamItem `refreshable:"false"`
KafkaUseSSL ParamItem `refreshable:"false"`
KafkaTLSCert ParamItem `refreshable:"false"`
KafkaTLSKey ParamItem `refreshable:"false"`
KafkaTLSCACert ParamItem `refreshable:"false"`
KafkaTLSKeyPassword ParamItem `refreshable:"false"`
ConsumerExtraConfig ParamGroup `refreshable:"false"` ConsumerExtraConfig ParamGroup `refreshable:"false"`
ProducerExtraConfig ParamGroup `refreshable:"false"` ProducerExtraConfig ParamGroup `refreshable:"false"`
ReadTimeout ParamItem `refreshable:"true"` ReadTimeout ParamItem `refreshable:"true"`
@ -731,6 +736,47 @@ func (k *KafkaConfig) Init(base *BaseTable) {
} }
k.SecurityProtocol.Init(base.mgr) k.SecurityProtocol.Init(base.mgr)
k.KafkaUseSSL = ParamItem{
Key: "kafka.ssl.enabled",
DefaultValue: "false",
Version: "2.3.8",
Doc: "whether to enable ssl mode",
Export: true,
}
k.KafkaUseSSL.Init(base.mgr)
k.KafkaTLSCert = ParamItem{
Key: "kafka.ssl.tlsCert",
Version: "2.3.8",
Doc: "path to client's public key (PEM) used for authentication",
Export: true,
}
k.KafkaTLSCert.Init(base.mgr)
k.KafkaTLSKey = ParamItem{
Key: "kafka.ssl.tlsKey",
Version: "2.3.8",
Doc: "path to client's private key (PEM) used for authentication",
Export: true,
}
k.KafkaTLSKey.Init(base.mgr)
k.KafkaTLSCACert = ParamItem{
Key: "kafka.ssl.tlsCaCert",
Version: "2.3.8",
Doc: "file or directory path to CA certificate(s) for verifying the broker's key",
Export: true,
}
k.KafkaTLSCACert.Init(base.mgr)
k.KafkaTLSKeyPassword = ParamItem{
Key: "kafka.ssl.tlsKeyPassword",
Version: "2.3.8",
Doc: "private key passphrase for use with ssl.key.location and set_ssl_cert(), if any",
Export: true,
}
k.KafkaTLSKeyPassword.Init(base.mgr)
k.ConsumerExtraConfig = ParamGroup{ k.ConsumerExtraConfig = ParamGroup{
KeyPrefix: "kafka.consumer.", KeyPrefix: "kafka.consumer.",
Version: "2.2.0", Version: "2.2.0",

View File

@ -169,6 +169,11 @@ func TestServiceParam(t *testing.T) {
assert.Equal(t, kc.SaslMechanisms.GetValue(), "PLAIN") assert.Equal(t, kc.SaslMechanisms.GetValue(), "PLAIN")
assert.Equal(t, kc.SecurityProtocol.GetValue(), "SASL_SSL") assert.Equal(t, kc.SecurityProtocol.GetValue(), "SASL_SSL")
assert.Equal(t, kc.ReadTimeout.GetAsDuration(time.Second), 10*time.Second) assert.Equal(t, kc.ReadTimeout.GetAsDuration(time.Second), 10*time.Second)
assert.Equal(t, kc.KafkaUseSSL.GetAsBool(), false)
assert.Empty(t, kc.KafkaTLSCACert.GetValue())
assert.Empty(t, kc.KafkaTLSCert.GetValue())
assert.Empty(t, kc.KafkaTLSKey.GetValue())
assert.Empty(t, kc.KafkaTLSKeyPassword.GetValue())
} }
}) })