From ba6d33cd57fa35a77b5b343d32de8884086e71eb Mon Sep 17 00:00:00 2001 From: groot Date: Wed, 28 Feb 2024 18:43:07 +0800 Subject: [PATCH] fix: Support TLS for kafka connection (#30468) #27977 Add extra configurations in milvus.yaml to pass certificates for kafka. Signed-off-by: yhmo --- configs/milvus.yaml | 6 +++ .../msgstream/mqwrapper/kafka/kafka_client.go | 14 +++++- pkg/util/paramtable/service_param.go | 46 +++++++++++++++++++ pkg/util/paramtable/service_param_test.go | 5 ++ 4 files changed, 70 insertions(+), 1 deletion(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index b3b56f4341..df57c561a5 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -126,6 +126,12 @@ pulsar: # saslMechanisms: PLAIN # securityProtocol: SASL_SSL # readTimeout: 10 # read message timeout in seconds +# ssl: +# enabled: false # Whether to support kafka secure connection mode +# tlsCert: /path/to/client.pem # path to client's public key +# tlsKey: /path/to/client.key # path to client's private key +# tlsCACert: /path/to/ca-cert # file or directory path to CA certificate +# tlsKeyPassword: "" # private key passphrase for use with private key, if any rocksmq: # The path where the message is stored in rocksmq diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go index d4d03b6e2c..2df2e8d57e 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go @@ -73,13 +73,25 @@ func NewKafkaClientInstanceWithConfig(ctx context.Context, config *paramtable.Ka panic("enable security mode need config username and password at the same time!") } + if config.SecurityProtocol.GetValue() != "" { + kafkaConfig.SetKey("security.protocol", config.SecurityProtocol.GetValue()) + } + if config.SaslUsername.GetValue() != "" && config.SaslPassword.GetValue() != "" { kafkaConfig.SetKey("sasl.mechanisms", config.SaslMechanisms.GetValue()) - kafkaConfig.SetKey("security.protocol", config.SecurityProtocol.GetValue()) kafkaConfig.SetKey("sasl.username", config.SaslUsername.GetValue()) kafkaConfig.SetKey("sasl.password", config.SaslPassword.GetValue()) } + if config.KafkaUseSSL.GetAsBool() { + kafkaConfig.SetKey("ssl.certificate.location", config.KafkaTLSCert.GetValue()) + kafkaConfig.SetKey("ssl.key.location", config.KafkaTLSKey.GetValue()) + kafkaConfig.SetKey("ssl.ca.location", config.KafkaTLSCACert.GetValue()) + if config.KafkaTLSKeyPassword.GetValue() != "" { + kafkaConfig.SetKey("ssl.key.password", config.KafkaTLSKeyPassword.GetValue()) + } + } + specExtraConfig := func(config map[string]string) kafka.ConfigMap { kafkaConfigMap := make(kafka.ConfigMap, len(config)) for k, v := range config { diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index ca6136d361..94b08e4807 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -684,6 +684,11 @@ type KafkaConfig struct { SaslPassword ParamItem `refreshable:"false"` SaslMechanisms ParamItem `refreshable:"false"` SecurityProtocol ParamItem `refreshable:"false"` + KafkaUseSSL ParamItem `refreshable:"false"` + KafkaTLSCert ParamItem `refreshable:"false"` + KafkaTLSKey ParamItem `refreshable:"false"` + KafkaTLSCACert ParamItem `refreshable:"false"` + KafkaTLSKeyPassword ParamItem `refreshable:"false"` ConsumerExtraConfig ParamGroup `refreshable:"false"` ProducerExtraConfig ParamGroup `refreshable:"false"` ReadTimeout ParamItem `refreshable:"true"` @@ -731,6 +736,47 @@ func (k *KafkaConfig) Init(base *BaseTable) { } k.SecurityProtocol.Init(base.mgr) + k.KafkaUseSSL = ParamItem{ + Key: "kafka.ssl.enabled", + DefaultValue: "false", + Version: "2.3.8", + Doc: "whether to enable ssl mode", + Export: true, + } + k.KafkaUseSSL.Init(base.mgr) + + k.KafkaTLSCert = ParamItem{ + Key: "kafka.ssl.tlsCert", + Version: "2.3.8", + Doc: "path to client's public key (PEM) used for authentication", + Export: true, + } + k.KafkaTLSCert.Init(base.mgr) + + k.KafkaTLSKey = ParamItem{ + Key: "kafka.ssl.tlsKey", + Version: "2.3.8", + Doc: "path to client's private key (PEM) used for authentication", + Export: true, + } + k.KafkaTLSKey.Init(base.mgr) + + k.KafkaTLSCACert = ParamItem{ + Key: "kafka.ssl.tlsCaCert", + Version: "2.3.8", + Doc: "file or directory path to CA certificate(s) for verifying the broker's key", + Export: true, + } + k.KafkaTLSCACert.Init(base.mgr) + + k.KafkaTLSKeyPassword = ParamItem{ + Key: "kafka.ssl.tlsKeyPassword", + Version: "2.3.8", + Doc: "private key passphrase for use with ssl.key.location and set_ssl_cert(), if any", + Export: true, + } + k.KafkaTLSKeyPassword.Init(base.mgr) + k.ConsumerExtraConfig = ParamGroup{ KeyPrefix: "kafka.consumer.", Version: "2.2.0", diff --git a/pkg/util/paramtable/service_param_test.go b/pkg/util/paramtable/service_param_test.go index e0fd6ab52a..6c56484b98 100644 --- a/pkg/util/paramtable/service_param_test.go +++ b/pkg/util/paramtable/service_param_test.go @@ -169,6 +169,11 @@ func TestServiceParam(t *testing.T) { assert.Equal(t, kc.SaslMechanisms.GetValue(), "PLAIN") assert.Equal(t, kc.SecurityProtocol.GetValue(), "SASL_SSL") assert.Equal(t, kc.ReadTimeout.GetAsDuration(time.Second), 10*time.Second) + assert.Equal(t, kc.KafkaUseSSL.GetAsBool(), false) + assert.Empty(t, kc.KafkaTLSCACert.GetValue()) + assert.Empty(t, kc.KafkaTLSCert.GetValue()) + assert.Empty(t, kc.KafkaTLSKey.GetValue()) + assert.Empty(t, kc.KafkaTLSKeyPassword.GetValue()) } })