Etcd config source support TLS (#20874) (#20910)

Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>

Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>

Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>
pull/20930/head
Enwei Jiao 2022-12-01 14:19:16 +08:00 committed by GitHub
parent c5f215da67
commit 802f512161
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
58 changed files with 709 additions and 138 deletions

View File

@ -220,7 +220,14 @@ func (c *mck) connectEctd() {
if c.etcdIP != "" {
etcdCli, err = etcd.GetRemoteEtcdClient([]string{c.etcdIP})
} else {
etcdCli, err = etcd.GetEtcdClient(&c.params.EtcdCfg)
etcdCli, err = etcd.GetEtcdClient(
c.params.EtcdCfg.UseEmbedEtcd,
c.params.EtcdCfg.EtcdUseSSL,
c.params.EtcdCfg.Endpoints,
c.params.EtcdCfg.EtcdTLSCert,
c.params.EtcdCfg.EtcdTLSKey,
c.params.EtcdCfg.EtcdTLSCACert,
c.params.EtcdCfg.EtcdTLSMinVersion)
}
if err != nil {
log.Fatal("failed to connect to etcd", zap.Error(err))

View File

@ -214,7 +214,12 @@ func (mr *MilvusRoles) Run(local bool, alias string) {
if Params.EtcdCfg.UseEmbedEtcd {
// Start etcd server.
etcd.InitEtcdServer(&Params.EtcdCfg)
etcd.InitEtcdServer(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.ConfigPath,
Params.EtcdCfg.DataDir,
Params.EtcdCfg.EtcdLogPath,
Params.EtcdCfg.EtcdLogLevel)
defer etcd.StopEtcdServer()
}
} else {

View File

@ -19,7 +19,14 @@ func (b etcdBasedBackend) CleanWithPrefix(prefix string) error {
}
func newEtcdBasedBackend(cfg *configs.MilvusConfig) (*etcdBasedBackend, error) {
etcdCli, err := etcd.GetEtcdClient(cfg.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
cfg.EtcdCfg.UseEmbedEtcd,
cfg.EtcdCfg.EtcdUseSSL,
cfg.EtcdCfg.Endpoints,
cfg.EtcdCfg.EtcdTLSCert,
cfg.EtcdCfg.EtcdTLSKey,
cfg.EtcdCfg.EtcdTLSCACert,
cfg.EtcdCfg.EtcdTLSMinVersion)
if err != nil {
return nil, err
}

View File

@ -73,7 +73,14 @@ func (r *Runner) WatchSessions() {
}
func (r *Runner) initEtcdCli() {
cli, err := etcd.GetEtcdClient(r.cfg.EtcdCfg)
cli, err := etcd.GetEtcdClient(
r.cfg.EtcdCfg.UseEmbedEtcd,
r.cfg.EtcdCfg.EtcdUseSSL,
r.cfg.EtcdCfg.Endpoints,
r.cfg.EtcdCfg.EtcdTLSCert,
r.cfg.EtcdCfg.EtcdTLSKey,
r.cfg.EtcdCfg.EtcdTLSCACert,
r.cfg.EtcdCfg.EtcdTLSMinVersion)
console.AbnormalExitIf(err, r.backupFinished.Load())
r.etcdCli = cli
}

View File

@ -24,6 +24,7 @@ import (
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/etcd"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
@ -46,23 +47,28 @@ type EtcdSource struct {
eh EventHandler
}
func NewEtcdSource(remoteInfo *EtcdInfo) (*EtcdSource, error) {
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: remoteInfo.Endpoints,
DialTimeout: 5 * time.Second,
})
func NewEtcdSource(etcdInfo *EtcdInfo) (*EtcdSource, error) {
etcdCli, err := etcd.GetEtcdClient(
etcdInfo.UseEmbed,
etcdInfo.UseSSL,
etcdInfo.Endpoints,
etcdInfo.CertFile,
etcdInfo.KeyFile,
etcdInfo.CaCertFile,
etcdInfo.MinVersion)
if err != nil {
return nil, err
}
return &EtcdSource{
es := &EtcdSource{
etcdCli: etcdCli,
ctx: context.Background(),
currentConfig: make(map[string]string),
keyPrefix: remoteInfo.KeyPrefix,
refreshMode: remoteInfo.RefreshMode,
refreshInterval: remoteInfo.RefreshInterval,
keyPrefix: etcdInfo.KeyPrefix,
refreshMode: etcdInfo.RefreshMode,
refreshInterval: etcdInfo.RefreshInterval,
intervalDone: make(chan bool, 1),
}, nil
}
return es, nil
}
// GetConfigurationByKey implements ConfigSource

View File

@ -38,8 +38,14 @@ type EventHandler interface {
// EtcdInfo has attribute for config center source initialization
type EtcdInfo struct {
Endpoints []string
KeyPrefix string
UseEmbed bool
UseSSL bool
Endpoints []string
KeyPrefix string
CertFile string
KeyFile string
CaCertFile string
MinVersion string
RefreshMode int
//Pull Configuration interval, unit is second

View File

@ -48,7 +48,14 @@ func Test_garbageCollector_basic(t *testing.T) {
meta, err := newMemoryMeta()
assert.Nil(t, err)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
segRefer, err := NewSegmentReferenceManager(etcdKV, nil)
@ -110,7 +117,14 @@ func Test_garbageCollector_scan(t *testing.T) {
meta, err := newMemoryMeta()
assert.Nil(t, err)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
segRefer, err := NewSegmentReferenceManager(etcdKV, nil)

View File

@ -36,7 +36,14 @@ func Test_SegmentReferenceManager(t *testing.T) {
var segRefer *SegmentReferenceManager
var err error
Params.Init()
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(etcdCli, "unittest")

View File

@ -913,7 +913,14 @@ func TestService_WatchServices(t *testing.T) {
func TestServer_watchQueryCoord(t *testing.T) {
Params.Init()
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
assert.NotNil(t, etcdKV)
@ -3318,7 +3325,14 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server {
Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
factory := dependency.NewDefaultFactory(true)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
sessKey := path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot)
_, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix())
@ -3357,7 +3371,14 @@ func newTestServerWithMeta(t *testing.T, receiveCh chan any, meta *meta, opts ..
Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
factory := dependency.NewDefaultFactory(true)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
sessKey := path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot)
_, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix())
@ -3405,7 +3426,14 @@ func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server {
Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
factory := dependency.NewDefaultFactory(true)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
sessKey := path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot)
_, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix())
@ -3599,7 +3627,14 @@ func testDataCoordBase(t *testing.T, opts ...Option) *Server {
Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
factory := dependency.NewDefaultFactory(true)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
sessKey := path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot)
_, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix())

View File

@ -83,7 +83,14 @@ func TestDataNode(t *testing.T) {
defer cancel()
node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
node.SetEtcdClient(etcdCli)
@ -654,7 +661,14 @@ func TestDataNode(t *testing.T) {
chanName := "fake-by-dev-rootcoord-dml-test-syncsegments-1"
node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
node.SetEtcdClient(etcdCli)
@ -747,7 +761,14 @@ func TestDataNode_AddSegment(t *testing.T) {
defer cancel()
node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
node.SetEtcdClient(etcdCli)
@ -816,7 +837,14 @@ func TestDataNode_AddSegment(t *testing.T) {
func TestWatchChannel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
node.SetEtcdClient(etcdCli)
@ -1087,7 +1115,14 @@ func TestDataNode_GetComponentStates(t *testing.T) {
}
func TestDataNode_ResendSegmentStats(t *testing.T) {
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-ResendSegmentStats"

View File

@ -33,7 +33,14 @@ func TestFlowGraphManager(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()

View File

@ -134,7 +134,14 @@ func makeNewChannelNames(names []string, suffix string) []string {
}
func clearEtcd(rootPath string) error {
client, err := etcd.GetEtcdClient(&Params.EtcdCfg)
client, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
if err != nil {
return err
}

View File

@ -33,7 +33,14 @@ func Test_NewClient(t *testing.T) {
proxy.Params.InitOnce()
ctx := context.Background()
etcdCli, err := etcd.GetEtcdClient(&proxy.Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
proxy.Params.EtcdCfg.UseEmbedEtcd,
proxy.Params.EtcdCfg.EtcdUseSSL,
proxy.Params.EtcdCfg.Endpoints,
proxy.Params.EtcdCfg.EtcdTLSCert,
proxy.Params.EtcdCfg.EtcdTLSKey,
proxy.Params.EtcdCfg.EtcdTLSCACert,
proxy.Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
client, err := NewClient(ctx, proxy.Params.EtcdCfg.MetaRootPath, etcdCli)
assert.Nil(t, err)

View File

@ -92,7 +92,14 @@ func (s *Server) init() error {
datacoord.Params.DataCoordCfg.Port = Params.Port
datacoord.Params.DataCoordCfg.Address = Params.GetAddress()
etcdCli, err := etcd.GetEtcdClient(&datacoord.Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
if err != nil {
log.Warn("DataCoord connect to etcd failed", zap.Error(err))
return err

View File

@ -233,7 +233,14 @@ func (s *Server) init() error {
dn.Params.DataNodeCfg.Port = Params.Port
dn.Params.DataNodeCfg.IP = Params.IP
etcdCli, err := etcd.GetEtcdClient(&dn.Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
if err != nil {
log.Error("failed to connect to etcd", zap.Error(err))
return err

View File

@ -41,8 +41,17 @@ func TestIndexCoordClient(t *testing.T) {
factory := dependency.NewDefaultFactory(true)
server, err := grpcindexcoord.NewServer(ctx, factory)
assert.NoError(t, err)
indexcoord.Params.InitOnce()
icm := indexcoord.NewIndexCoordMock()
etcdCli, err := etcd.GetEtcdClient(&ClientParams.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
indexcoord.Params.EtcdCfg.UseEmbedEtcd,
indexcoord.Params.EtcdCfg.EtcdUseSSL,
indexcoord.Params.EtcdCfg.Endpoints,
indexcoord.Params.EtcdCfg.EtcdTLSCert,
indexcoord.Params.EtcdCfg.EtcdTLSKey,
indexcoord.Params.EtcdCfg.EtcdTLSCACert,
indexcoord.Params.EtcdCfg.EtcdTLSMinVersion)
assert.NoError(t, err)
icm.CallRegister = func() error {
session := sessionutil.NewSession(context.Background(), indexcoord.Params.EtcdCfg.MetaRootPath, etcdCli)

View File

@ -100,7 +100,14 @@ func (s *Server) init() error {
closer := trace.InitTracing("IndexCoord")
s.closer = closer
etcdCli, err := etcd.GetEtcdClient(&indexcoord.Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
if err != nil {
log.Warn("IndexCoord connect to etcd failed", zap.Error(err))
return err

View File

@ -134,7 +134,14 @@ func TestIndexNodeClient(t *testing.T) {
inm := indexnode.NewIndexNodeMock()
ParamsGlobal.InitOnce()
etcdCli, err := etcd.GetEtcdClient(&ParamsGlobal.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
ParamsGlobal.EtcdCfg.UseEmbedEtcd,
ParamsGlobal.EtcdCfg.EtcdUseSSL,
ParamsGlobal.EtcdCfg.Endpoints,
ParamsGlobal.EtcdCfg.EtcdTLSCert,
ParamsGlobal.EtcdCfg.EtcdTLSKey,
ParamsGlobal.EtcdCfg.EtcdTLSCACert,
ParamsGlobal.EtcdCfg.EtcdTLSMinVersion)
assert.NoError(t, err)
inm.SetEtcdClient(etcdCli)
err = ins.SetClient(inm)

View File

@ -156,7 +156,14 @@ func (s *Server) init() error {
return err
}
etcdCli, err := etcd.GetEtcdClient(&indexnode.Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
if err != nil {
log.Warn("IndexNode connect to etcd failed", zap.Error(err))
return err

View File

@ -44,7 +44,14 @@ func TestIndexNodeServer(t *testing.T) {
inm := indexnode.NewIndexNodeMock()
ParamsGlobal.InitOnce()
etcdCli, err := etcd.GetEtcdClient(&ParamsGlobal.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
ParamsGlobal.EtcdCfg.UseEmbedEtcd,
ParamsGlobal.EtcdCfg.EtcdUseSSL,
ParamsGlobal.EtcdCfg.Endpoints,
ParamsGlobal.EtcdCfg.EtcdTLSCert,
ParamsGlobal.EtcdCfg.EtcdTLSKey,
ParamsGlobal.EtcdCfg.EtcdTLSCACert,
ParamsGlobal.EtcdCfg.EtcdTLSMinVersion)
assert.NoError(t, err)
inm.SetEtcdClient(etcdCli)
err = server.SetClient(inm)

View File

@ -324,7 +324,14 @@ func (s *Server) init() error {
s.closer = closer
log.Info("init Proxy's tracer done", zap.String("service name", serviceName))
etcdCli, err := etcd.GetEtcdClient(&proxy.Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
if err != nil {
log.Warn("Proxy connect to etcd failed", zap.Error(err))
return err

View File

@ -35,7 +35,14 @@ func Test_NewClient(t *testing.T) {
ctx := context.Background()
etcdCli, err := etcd.GetEtcdClient(&proxy.Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
proxy.Params.EtcdCfg.UseEmbedEtcd,
proxy.Params.EtcdCfg.EtcdUseSSL,
proxy.Params.EtcdCfg.Endpoints,
proxy.Params.EtcdCfg.EtcdTLSCert,
proxy.Params.EtcdCfg.EtcdTLSKey,
proxy.Params.EtcdCfg.EtcdTLSCACert,
proxy.Params.EtcdCfg.EtcdTLSMinVersion)
assert.NoError(t, err)
client, err := NewClient(ctx, proxy.Params.EtcdCfg.MetaRootPath, etcdCli)
assert.Nil(t, err)

View File

@ -118,7 +118,14 @@ func (s *Server) init() error {
closer := trace.InitTracing("querycoord")
s.closer = closer
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
if err != nil {
log.Warn("QueryCoord connect to etcd failed", zap.Error(err))
return err

View File

@ -105,7 +105,14 @@ func (s *Server) init() error {
log.Info("QueryNode", zap.Int("port", Params.Port))
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
if err != nil {
log.Warn("QueryNode connect to etcd failed", zap.Error(err))
return err

View File

@ -35,7 +35,14 @@ func Test_NewClient(t *testing.T) {
proxy.Params.InitOnce()
ctx := context.Background()
etcdCli, err := etcd.GetEtcdClient(&proxy.Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
proxy.Params.EtcdCfg.UseEmbedEtcd,
proxy.Params.EtcdCfg.EtcdUseSSL,
proxy.Params.EtcdCfg.Endpoints,
proxy.Params.EtcdCfg.EtcdTLSCert,
proxy.Params.EtcdCfg.EtcdTLSKey,
proxy.Params.EtcdCfg.EtcdTLSCACert,
proxy.Params.EtcdCfg.EtcdTLSMinVersion)
assert.NoError(t, err)
client, err := NewClient(ctx, proxy.Params.EtcdCfg.MetaRootPath, etcdCli)
assert.Nil(t, err)

View File

@ -162,7 +162,14 @@ func (s *Server) init() error {
closer := trace.InitTracing("root_coord")
s.closer = closer
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
if err != nil {
log.Warn("RootCoord connect to etcd failed", zap.Error(err))
return err

View File

@ -178,7 +178,14 @@ func TestRun(t *testing.T) {
rootcoord.Params.Init()
rootcoord.Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
sessKey := path.Join(rootcoord.Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot)
_, err = etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix())

View File

@ -51,7 +51,14 @@ func TestMockEtcd(t *testing.T) {
Params.InitOnce()
Params.EtcdCfg.MetaRootPath = "indexcoord-mock"
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.NoError(t, err)
etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
@ -98,7 +105,14 @@ func testIndexCoord(t *testing.T) {
// first start an IndexNode
inm0 := indexnode.NewIndexNodeMock()
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.NoError(t, err)
// start IndexCoord

View File

@ -33,7 +33,6 @@ func TestEtcdConfigLoad(te *testing.T) {
param := new(paramtable.ServiceParam)
te.Setenv("etcd.use.embed", "true")
// TODO, not sure if the relative path works for ci environment
te.Setenv("etcd.config.path", "../../../configs/advanced/etcd.yaml")
te.Setenv("etcd.data.dir", "etcd.test.data.dir")

View File

@ -40,7 +40,14 @@ func TestMain(m *testing.M) {
}
func TestEtcdKV_Load(te *testing.T) {
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
defer etcdCli.Close()
assert.NoError(te, err)
te.Run("EtcdKV SaveAndLoad", func(t *testing.T) {

View File

@ -50,7 +50,14 @@ func NewMetaKvFactory(rootPath string, etcdCfg *paramtable.EtcdConfig) (kv.MetaK
}
return metaKv, err
}
client, err := etcd.GetEtcdClient(etcdCfg)
client, err := etcd.GetEtcdClient(
etcdCfg.UseEmbedEtcd,
etcdCfg.EtcdUseSSL,
etcdCfg.Endpoints,
etcdCfg.EtcdTLSCert,
etcdCfg.EtcdTLSKey,
etcdCfg.EtcdTLSCACert,
etcdCfg.EtcdTLSMinVersion)
if err != nil {
return nil, err
}

View File

@ -48,7 +48,14 @@ func TestMetaSnapshot(t *testing.T) {
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
@ -188,7 +195,14 @@ func TestGetRevOnEtcd(t *testing.T) {
tsKey := "timestamp"
key := path.Join(rootPath, tsKey)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
@ -232,7 +246,14 @@ func TestLoad(t *testing.T) {
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
@ -280,7 +301,14 @@ func TestMultiSave(t *testing.T) {
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
@ -344,7 +372,14 @@ func TestMultiSaveAndRemoveWithPrefix(t *testing.T) {
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
@ -422,7 +457,14 @@ func TestTsBackward(t *testing.T) {
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
@ -449,7 +491,14 @@ func TestFix7150(t *testing.T) {
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
tsKey := "timestamp"
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()

View File

@ -263,7 +263,14 @@ func Test_SuffixSnapshotLoad(t *testing.T) {
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
sep := "_ts"
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
require.Nil(t, err)
defer etcdCli.Close()
etcdkv := etcdkv.NewEtcdKV(etcdCli, rootPath)
@ -315,7 +322,14 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) {
Params.Init()
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
sep := "_ts"
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
require.Nil(t, err)
defer etcdCli.Close()
etcdkv := etcdkv.NewEtcdKV(etcdCli, rootPath)
@ -391,7 +405,14 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) {
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
sep := "_ts"
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
require.Nil(t, err)
defer etcdCli.Close()
etcdkv := etcdkv.NewEtcdKV(etcdCli, rootPath)

View File

@ -298,7 +298,14 @@ func runIndexNode(ctx context.Context, localMsg bool, alias string) *grpcindexno
panic(err)
}
wg.Done()
etcd, err := etcd.GetEtcdClient(&indexnode.Params.EtcdCfg)
etcd, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
if err != nil {
panic(err)
}
@ -513,7 +520,14 @@ func TestProxy(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, proxy)
etcdcli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdcli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
defer etcdcli.Close()
assert.NoError(t, err)
proxy.SetEtcdClient(etcdcli)

View File

@ -44,7 +44,14 @@ func (suite *RowCountBasedBalancerTestSuite) SetupSuite() {
func (suite *RowCountBasedBalancerTestSuite) SetupTest() {
var err error
config := GenerateEtcdConfig()
cli, err := etcd.GetEtcdClient(&config)
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd,
config.EtcdUseSSL,
config.Endpoints,
config.EtcdTLSCert,
config.EtcdTLSKey,
config.EtcdTLSCACert,
config.EtcdTLSMinVersion)
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath)
suite.broker = meta.NewMockBroker(suite.T())

View File

@ -44,7 +44,14 @@ func (suite *ChannelCheckerTestSuite) SetupSuite() {
func (suite *ChannelCheckerTestSuite) SetupTest() {
var err error
config := GenerateEtcdConfig()
cli, err := etcd.GetEtcdClient(&config)
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd,
config.EtcdUseSSL,
config.Endpoints,
config.EtcdTLSCert,
config.EtcdTLSKey,
config.EtcdTLSCACert,
config.EtcdTLSMinVersion)
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath)

View File

@ -44,7 +44,14 @@ func (suite *SegmentCheckerTestSuite) SetupSuite() {
func (suite *SegmentCheckerTestSuite) SetupTest() {
var err error
config := GenerateEtcdConfig()
cli, err := etcd.GetEtcdClient(&config)
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd,
config.EtcdUseSSL,
config.Endpoints,
config.EtcdTLSCert,
config.EtcdTLSKey,
config.EtcdTLSCACert,
config.EtcdTLSMinVersion)
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath)

View File

@ -117,7 +117,14 @@ func (suite *JobSuite) SetupSuite() {
func (suite *JobSuite) SetupTest() {
config := GenerateEtcdConfig()
cli, err := etcd.GetEtcdClient(&config)
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd,
config.EtcdUseSSL,
config.Endpoints,
config.EtcdTLSCert,
config.EtcdTLSKey,
config.EtcdTLSCACert,
config.EtcdTLSMinVersion)
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath)

View File

@ -68,7 +68,14 @@ func (suite *CollectionManagerSuite) SetupSuite() {
func (suite *CollectionManagerSuite) SetupTest() {
var err error
config := GenerateEtcdConfig()
cli, err := etcd.GetEtcdClient(&config)
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd,
config.EtcdUseSSL,
config.Endpoints,
config.EtcdTLSCert,
config.EtcdTLSKey,
config.EtcdTLSCACert,
config.EtcdTLSMinVersion)
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath)
suite.store = NewMetaStore(suite.kv)

View File

@ -51,7 +51,14 @@ func (suite *ReplicaManagerSuite) SetupSuite() {
func (suite *ReplicaManagerSuite) SetupTest() {
var err error
config := GenerateEtcdConfig()
cli, err := etcd.GetEtcdClient(&config)
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd,
config.EtcdUseSSL,
config.Endpoints,
config.EtcdTLSCert,
config.EtcdTLSKey,
config.EtcdTLSCACert,
config.EtcdTLSMinVersion)
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath)
suite.store = NewMetaStore(suite.kv)

View File

@ -145,7 +145,14 @@ func (suite *CollectionObserverSuite) SetupTest() {
suite.idAllocator = RandomIncrementIDAllocator()
log.Debug("create embedded etcd KV...")
config := GenerateEtcdConfig()
client, err := etcd.GetEtcdClient(&config)
client, err := etcd.GetEtcdClient(
config.UseEmbedEtcd,
config.EtcdUseSSL,
config.Endpoints,
config.EtcdTLSCert,
config.EtcdTLSKey,
config.EtcdTLSCACert,
config.EtcdTLSMinVersion)
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(client, Params.EtcdCfg.MetaRootPath+"-"+RandomMetaRootPath())
suite.Require().NoError(err)

View File

@ -107,7 +107,14 @@ func (suite *HandoffObserverTestSuit) SetupTest() {
suite.idAllocator = RandomIncrementIDAllocator()
log.Debug("create embedded etcd KV...")
config := GenerateEtcdConfig()
client, err := etcd.GetEtcdClient(&config)
client, err := etcd.GetEtcdClient(
config.UseEmbedEtcd,
config.EtcdUseSSL,
config.Endpoints,
config.EtcdTLSCert,
config.EtcdTLSKey,
config.EtcdTLSCACert,
config.EtcdTLSMinVersion)
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(client, Params.EtcdCfg.MetaRootPath+"-"+RandomMetaRootPath())
suite.Require().NoError(err)

View File

@ -49,7 +49,14 @@ func (suite *LeaderObserverTestSuite) SetupSuite() {
func (suite *LeaderObserverTestSuite) SetupTest() {
var err error
config := GenerateEtcdConfig()
cli, err := etcd.GetEtcdClient(&config)
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd,
config.EtcdUseSSL,
config.Endpoints,
config.EtcdTLSCert,
config.EtcdTLSKey,
config.EtcdTLSCACert,
config.EtcdTLSMinVersion)
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath)

View File

@ -393,7 +393,14 @@ func newQueryCoord() (*Server, error) {
return nil, err
}
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
if err != nil {
return nil, err
}

View File

@ -111,7 +111,14 @@ func (suite *ServiceSuite) SetupSuite() {
func (suite *ServiceSuite) SetupTest() {
config := params.GenerateEtcdConfig()
cli, err := etcd.GetEtcdClient(&config)
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd,
config.EtcdUseSSL,
config.Endpoints,
config.EtcdTLSCert,
config.EtcdTLSKey,
config.EtcdTLSCACert,
config.EtcdTLSMinVersion)
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath)

View File

@ -118,7 +118,14 @@ func (suite *TaskSuite) SetupSuite() {
func (suite *TaskSuite) SetupTest() {
config := GenerateEtcdConfig()
cli, err := etcd.GetEtcdClient(&config)
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd,
config.EtcdUseSSL,
config.Endpoints,
config.EtcdTLSCert,
config.EtcdTLSKey,
config.EtcdTLSCACert,
config.EtcdTLSMinVersion)
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath)

View File

@ -444,7 +444,14 @@ func TestImpl_isHealthy(t *testing.T) {
func TestImpl_ShowConfigurations(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.NoError(t, err)
defer etcdCli.Close()
@ -486,7 +493,14 @@ func TestImpl_GetMetrics(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.NoError(t, err)
defer etcdCli.Close()

View File

@ -37,7 +37,14 @@ func TestGetSystemInfoMetrics(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.NoError(t, err)
defer etcdCli.Close()
node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.EtcdCfg.MetaRootPath, etcdCli)
@ -64,7 +71,14 @@ func TestGetComponentConfigurationsFailed(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.NoError(t, err)
defer etcdCli.Close()
node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.EtcdCfg.MetaRootPath, etcdCli)

View File

@ -523,7 +523,14 @@ func genCollectionMeta(collectionID UniqueID, schema *schemapb.CollectionSchema)
// ---------- unittest util functions ----------
// functions of third-party
func genEtcdKV() (*etcdkv.EtcdKV, error) {
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
if err != nil {
return nil, err
}
@ -1689,7 +1696,14 @@ func saveChangeInfo(key string, value string) error {
func genSimpleQueryNodeWithMQFactory(ctx context.Context, fac dependency.Factory) (*QueryNode, error) {
node := NewQueryNode(ctx, fac)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
if err != nil {
return nil, err
}

View File

@ -82,7 +82,14 @@ func newQueryNodeMock() *QueryNode {
cancel()
}()
}
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
if err != nil {
panic(err)
}
@ -171,7 +178,14 @@ func TestQueryNode_register(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
etcdcli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdcli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.NoError(t, err)
defer etcdcli.Close()
node.SetEtcdClient(etcdcli)
@ -189,7 +203,16 @@ func TestQueryNode_init(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
etcdcli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
defer node.Stop()
etcdcli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.NoError(t, err)
defer etcdcli.Close()
node.SetEtcdClient(etcdcli)

View File

@ -106,7 +106,14 @@ func TestProxyClientManager_GetProxyClients(t *testing.T) {
core, err := NewCore(context.Background(), nil)
assert.Nil(t, err)
cli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
cli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
defer cli.Close()
assert.Nil(t, err)
core.etcdCli = cli
@ -130,7 +137,14 @@ func TestProxyClientManager_AddProxyClient(t *testing.T) {
core, err := NewCore(context.Background(), nil)
assert.Nil(t, err)
cli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
cli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer cli.Close()
core.etcdCli = cli

View File

@ -34,7 +34,14 @@ import (
func TestProxyManager(t *testing.T) {
Params.Init()
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@ -103,7 +110,14 @@ func TestProxyManager(t *testing.T) {
func TestProxyManager_ErrCompacted(t *testing.T) {
Params.Init()
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.Nil(t, err)
defer etcdCli.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)

View File

@ -1332,7 +1332,14 @@ func TestRootcoord_EnableActiveStandby(t *testing.T) {
ctx := context.Background()
coreFactory := dependency.NewDefaultFactory(true)
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.NoError(t, err)
defer etcdCli.Close()
core, err := NewCore(ctx, coreFactory)

View File

@ -21,7 +21,14 @@ import (
func getTestEtcdCli() *clientv3.Client {
Params.InitOnce()
cli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
cli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
if err != nil {
panic(err)
}

View File

@ -4,7 +4,6 @@ import (
"sync"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/paramtable"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
@ -25,11 +24,17 @@ func GetEmbedEtcdClient() (*clientv3.Client, error) {
}
// InitEtcdServer initializes embedded etcd server singleton.
func InitEtcdServer(etcdCfg *paramtable.EtcdConfig) error {
if etcdCfg.UseEmbedEtcd {
func InitEtcdServer(
useEmbedEtcd bool,
configPath string,
dataDir string,
logPath string,
logLevel string,
) error {
if useEmbedEtcd {
var initError error
initOnce.Do(func() {
path := etcdCfg.ConfigPath
path := configPath
var cfg *embed.Config
if len(path) > 0 {
cfgFromFile, err := embed.ConfigFromFile(path)
@ -40,22 +45,26 @@ func InitEtcdServer(etcdCfg *paramtable.EtcdConfig) error {
} else {
cfg = embed.NewConfig()
}
cfg.Dir = etcdCfg.DataDir
cfg.LogOutputs = []string{etcdCfg.EtcdLogPath}
cfg.LogLevel = etcdCfg.EtcdLogLevel
cfg.Dir = dataDir
cfg.LogOutputs = []string{logPath}
cfg.LogLevel = logLevel
e, err := embed.StartEtcd(cfg)
if err != nil {
log.Error("failed to init embedded Etcd server", zap.Error(err))
initError = err
}
etcdServer = e
log.Info("finish init Etcd config", zap.String("path", path), zap.String("data", etcdCfg.DataDir))
log.Info("finish init Etcd config", zap.String("path", path), zap.String("data", dataDir))
})
return initError
}
return nil
}
func HasServer() bool {
return etcdServer != nil
}
// StopEtcdServer stops embedded etcd server singleton.
func StopEtcdServer() {
if etcdServer != nil {

View File

@ -25,7 +25,6 @@ import (
"github.com/pkg/errors"
"github.com/milvus-io/milvus/internal/util/paramtable"
clientv3 "go.etcd.io/etcd/client/v3"
)
@ -34,14 +33,21 @@ var (
)
// GetEtcdClient returns etcd client
func GetEtcdClient(cfg *paramtable.EtcdConfig) (*clientv3.Client, error) {
if cfg.UseEmbedEtcd {
func GetEtcdClient(
useEmbedEtcd bool,
useSSL bool,
endpoints []string,
certFile string,
keyFile string,
caCertFile string,
minVersion string) (*clientv3.Client, error) {
if useEmbedEtcd {
return GetEmbedEtcdClient()
}
if cfg.EtcdUseSSL {
return GetRemoteEtcdSSLClient(cfg.Endpoints, cfg.EtcdTLSCert, cfg.EtcdTLSKey, cfg.EtcdTLSCACert, cfg.EtcdTLSMinVersion)
if useSSL {
return GetRemoteEtcdSSLClient(endpoints, certFile, keyFile, caCertFile, minVersion)
}
return GetRemoteEtcdClient(cfg.Endpoints)
return GetRemoteEtcdClient(endpoints)
}
// GetRemoteEtcdClient returns client of remote etcd by given endpoints

View File

@ -19,26 +19,18 @@ package etcd
import (
"context"
"errors"
"os"
"path"
"testing"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
)
var Params paramtable.ServiceParam
func TestEtcd(t *testing.T) {
Params.Init()
Params.EtcdCfg.UseEmbedEtcd = true
Params.EtcdCfg.DataDir = "/tmp/data"
err := InitEtcdServer(&Params.EtcdCfg)
err := InitEtcdServer(true, "", "/tmp/data", "stdout", "info")
assert.NoError(t, err)
defer os.RemoveAll(Params.EtcdCfg.DataDir)
defer StopEtcdServer()
etcdCli, err := GetEtcdClient(&Params.EtcdCfg)
etcdCli, err := GetEtcdClient(true, false, []string{}, "", "", "", "")
assert.NoError(t, err)
key := path.Join("test", "test")
@ -50,26 +42,25 @@ func TestEtcd(t *testing.T) {
assert.False(t, resp.Count < 1)
assert.Equal(t, string(resp.Kvs[0].Value), "value")
Params.EtcdCfg.UseEmbedEtcd = false
Params.EtcdCfg.EtcdUseSSL = true
Params.EtcdCfg.EtcdTLSMinVersion = "1.3"
Params.EtcdCfg.EtcdTLSCACert = "../../../configs/cert/ca.pem"
Params.EtcdCfg.EtcdTLSCert = "../../../configs/cert/client.pem"
Params.EtcdCfg.EtcdTLSKey = "../../../configs/cert/client.key"
etcdCli, err = GetEtcdClient(&Params.EtcdCfg)
assert.NoError(t, err)
Params.EtcdCfg.EtcdTLSMinVersion = "some not right word"
etcdCli, err = GetEtcdClient(&Params.EtcdCfg)
etcdCli, err = GetEtcdClient(false, true, []string{},
"../../../configs/cert/client.pem",
"../../../configs/cert/client.key",
"../../../configs/cert/ca.pem",
"some not right word")
assert.NotNil(t, err)
Params.EtcdCfg.EtcdTLSMinVersion = "1.2"
Params.EtcdCfg.EtcdTLSCACert = "wrong/file"
etcdCli, err = GetEtcdClient(&Params.EtcdCfg)
etcdCli, err = GetEtcdClient(false, true, []string{},
"../../../configs/cert/client.pem",
"../../../configs/cert/client.key",
"wrong/file",
"1.2")
assert.NotNil(t, err)
Params.EtcdCfg.EtcdTLSCACert = "../../../configs/cert/ca.pem"
Params.EtcdCfg.EtcdTLSCert = "wrong/file"
etcdCli, err = GetEtcdClient(false, true, []string{},
"wrong/file",
"../../../configs/cert/client.key",
"../../../configs/cert/ca.pem",
"1.2")
assert.NotNil(t, err)
}

View File

@ -23,6 +23,7 @@ import (
config "github.com/milvus-io/milvus/internal/config"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/logutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
@ -137,26 +138,38 @@ func (gp *BaseTable) initConfigsFromLocal(formatter func(key string) string) {
}
func (gp *BaseTable) initConfigsFromRemote(formatter func(key string) string) {
endpoints, err := gp.mgr.GetConfig("etcd.endpoints")
_, err := gp.mgr.GetConfig("etcd.endpoints")
if err != nil {
log.Info("cannot find etcd.endpoints")
return
}
rootPath, err := gp.mgr.GetConfig("etcd.rootPath")
_, err = gp.mgr.GetConfig("etcd.rootPath")
if err != nil {
log.Info("cannot find etcd.rootPath")
return
}
etcdConfig := EtcdConfig{}
etcdConfig.init(gp)
if etcdConfig.UseEmbedEtcd && !etcd.HasServer() {
return
}
info := &config.EtcdInfo{
UseEmbed: etcdConfig.UseEmbedEtcd,
UseSSL: etcdConfig.EtcdUseSSL,
Endpoints: etcdConfig.Endpoints,
CertFile: etcdConfig.EtcdTLSCert,
KeyFile: etcdConfig.EtcdTLSKey,
CaCertFile: etcdConfig.EtcdTLSCACert,
MinVersion: etcdConfig.EtcdTLSMinVersion,
KeyPrefix: etcdConfig.MetaRootPath,
RefreshMode: config.ModeInterval,
RefreshInterval: 10 * time.Second,
}
configFilePath := gp.configDir + "/" + gp.YamlFile
gp.mgr, err = config.Init(config.WithEnvSource(formatter),
config.WithFilesSource(configFilePath),
config.WithEtcdSource(&config.EtcdInfo{
Endpoints: strings.Split(endpoints, ","),
KeyPrefix: rootPath,
RefreshMode: config.ModeInterval,
RefreshInterval: 10 * time.Second,
}))
config.WithEtcdSource(info))
if err != nil {
log.Info("init with etcd failed", zap.Error(err))
return