fix: etcd not connectable when auth enabled (#31668)

Fix etcd config source didn't respect auth enabled
Also removed pulsar recoverable error when pulsar return ConsumerBusy.
It could happen that pulsar didn't find the original consumer is dead
and recover takes some time.
fix: #31631
pr: #31633

Signed-off-by: xiaofanluan <xiaofan.luan@zilliz.com>
pull/31787/head
Xiaofan 2024-04-01 00:23:18 -07:00 committed by GitHub
parent f3216bfe18
commit be834638d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 16 additions and 8 deletions

View File

@ -216,8 +216,11 @@ func (c *mck) connectEctd() {
if c.etcdIP != "" {
etcdCli, err = etcd.GetRemoteEtcdClient([]string{c.etcdIP})
} else {
etcdCli, err = etcd.GetEtcdClient(
etcdCli, err = etcd.CreateEtcdClient(
c.params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
c.params.EtcdCfg.EtcdEnableAuth.GetAsBool(),
c.params.EtcdCfg.EtcdAuthUserName.GetValue(),
c.params.EtcdCfg.EtcdAuthPassword.GetValue(),
c.params.EtcdCfg.EtcdUseSSL.GetAsBool(),
c.params.EtcdCfg.Endpoints.GetAsStrings(),
c.params.EtcdCfg.EtcdTLSCert.GetValue(),

View File

@ -48,8 +48,11 @@ type EtcdSource struct {
func NewEtcdSource(etcdInfo *EtcdInfo) (*EtcdSource, error) {
log.Debug("init etcd source", zap.Any("etcdInfo", etcdInfo))
etcdCli, err := etcd.GetEtcdClient(
etcdCli, err := etcd.CreateEtcdClient(
etcdInfo.UseEmbed,
etcdInfo.EnableAuth,
etcdInfo.UserName,
etcdInfo.PassWord,
etcdInfo.UseSSL,
etcdInfo.Endpoints,
etcdInfo.CertFile,

View File

@ -36,6 +36,9 @@ type Source interface {
// EtcdInfo has attribute for config center source initialization
type EtcdInfo struct {
UseEmbed bool
EnableAuth bool
UserName string
PassWord string
UseSSL bool
Endpoints []string
KeyPrefix string

View File

@ -18,7 +18,6 @@ package pulsar
import (
"fmt"
"strings"
"sync"
"time"
@ -31,7 +30,6 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
@ -122,9 +120,6 @@ func (pc *pulsarClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.
})
if err != nil {
metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc()
if strings.Contains(err.Error(), "ConsumerBusy") {
return nil, retry.Unrecoverable(err)
}
return nil, err
}

View File

@ -668,7 +668,7 @@ func TestPulsarClient_SubscribeExclusiveFail(t *testing.T) {
_, err := pc.Subscribe(mqwrapper.ConsumerOptions{Topic: "test_topic_name"})
assert.Error(t, err)
assert.False(t, retry.IsRecoverable(err))
assert.True(t, retry.IsRecoverable(err))
})
}

View File

@ -35,6 +35,7 @@ import (
var maxTxnNum = 128
// GetEtcdClient returns etcd client
// should only used for test
func GetEtcdClient(
useEmbedEtcd bool,
useSSL bool,

View File

@ -178,6 +178,9 @@ func (bt *BaseTable) initConfigsFromRemote() {
}
info := &config.EtcdInfo{
UseEmbed: etcdConfig.UseEmbedEtcd.GetAsBool(),
EnableAuth: etcdConfig.EtcdEnableAuth.GetAsBool(),
UserName: etcdConfig.EtcdAuthUserName.GetValue(),
PassWord: etcdConfig.EtcdAuthPassword.GetValue(),
UseSSL: etcdConfig.EtcdUseSSL.GetAsBool(),
Endpoints: etcdConfig.Endpoints.GetAsStrings(),
CertFile: etcdConfig.EtcdTLSCert.GetValue(),