Add SessionManager (#5288)

Add SessionManager.
Resolves: #5174 

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/5277/head
godchen 2021-05-19 18:36:05 +08:00 committed by GitHub
parent 3db653f2d0
commit e224d1e725
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 296 additions and 644 deletions

View File

@ -16,7 +16,6 @@ package datanode
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
@ -26,13 +25,10 @@ import (
"go.uber.org/zap"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -77,13 +73,6 @@ type DataNode struct {
flushChan chan<- *flushMsg
replica Replica
etcdKV *etcdkv.EtcdKV
session struct {
NodeName string
IP string
LeaseID clientv3.LeaseID
}
closer io.Closer
msFactory msgstream.Factory
@ -144,31 +133,6 @@ func (node *DataNode) SetDataServiceInterface(ds types.DataService) error {
// At last, data node initializes its `dataSyncService` and `metaService`.
func (node *DataNode) Init() error {
ctx := context.Background()
connectEtcdFn := func() error {
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second})
if err != nil {
return err
}
node.etcdKV = etcdkv.NewEtcdKV(etcdCli, Params.MetaRootPath)
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
if err != nil {
return err
}
ch, err := node.registerService(fmt.Sprintf("datanode-%d", Params.NodeID), Params.IP)
if err != nil {
return err
}
go func() {
for {
for range ch {
//TODO process lesase response
}
}
}()
req := &datapb.RegisterNodeRequest{
Base: &commonpb.MsgBase{
SourceID: node.NodeID,
@ -326,32 +290,3 @@ func (node *DataNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin
Value: "",
}, nil
}
func (node *DataNode) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := node.etcdKV.Grant(5)
if err != nil {
fmt.Printf("grant error %s\n", err)
return nil, err
}
node.session.NodeName = nodeName
node.session.IP = ip
node.session.LeaseID = respID
sessionJSON, err := json.Marshal(node.session)
if err != nil {
return nil, err
}
err = node.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID)
if err != nil {
fmt.Printf("put lease error %s\n", err)
return nil, err
}
ch, err := node.etcdKV.KeepAlive(respID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err
}
return ch, nil
}

View File

@ -12,7 +12,6 @@ package dataservice
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
@ -65,12 +64,7 @@ type Server struct {
masterClient types.MasterService
ttMsgStream msgstream.MsgStream
k2sMsgStream msgstream.MsgStream
session struct {
NodeName string
IP string
LeaseID clientv3.LeaseID
}
ddChannelMu struct {
ddChannelMu struct {
sync.Mutex
name string
}
@ -110,21 +104,6 @@ func (s *Server) SetMasterClient(masterClient types.MasterService) {
}
func (s *Server) Init() error {
if err := s.initMeta(); err != nil {
return err
}
ch, err := s.registerService(fmt.Sprintf("dataservice-%d", Params.NodeID), "localhost:123456")
if err != nil {
return err
}
go func() {
for {
for range ch {
//TODO process lesase response
}
}
}()
return nil
}
@ -139,6 +118,10 @@ func (s *Server) Start() error {
return err
}
if err := s.initMeta(); err != nil {
return err
}
s.allocator = newAllocator(s.masterClient)
s.statsHandler = newStatsHandler(s.meta)
@ -900,33 +883,3 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
resp.Infos = infos
return resp, nil
}
func (s *Server) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := s.kvClient.Grant(5)
if err != nil {
fmt.Printf("grant error %s\n", err)
return nil, err
}
s.session.NodeName = nodeName
s.session.IP = ip
s.session.LeaseID = respID
sessionJSON, err := json.Marshal(s.session)
if err != nil {
return nil, err
}
err = s.kvClient.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID)
if err != nil {
fmt.Printf("put lease error %s\n", err)
return nil, err
}
ch, err := s.kvClient.KeepAlive(respID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err
}
return ch, nil
}

View File

@ -22,6 +22,7 @@ import (
"sync"
"time"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
@ -39,6 +40,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/trace"
)
@ -55,6 +57,9 @@ type Server struct {
masterService types.MasterService
dataService types.DataService
etcdKV *etcdkv.EtcdKV
signal <-chan bool
newMasterServiceClient func(string) (types.MasterService, error)
newDataServiceClient func(string) types.DataService
@ -169,6 +174,11 @@ func (s *Server) init() error {
addr := Params.IP + ":" + strconv.Itoa(Params.Port)
log.Debug("DataNode address", zap.String("address", addr))
self := sessionutil.NewSession("datanode", funcutil.GetLocalIP()+":"+strconv.Itoa(Params.Port), false)
sm := sessionutil.NewSessionManager(ctx, dn.Params.EtcdAddress, dn.Params.MetaRootPath, self)
sm.Init()
sessionutil.SetGlobalSessionManager(sm)
err := s.startGrpc()
if err != nil {
return err

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/trace"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
@ -87,6 +88,13 @@ func (s *Server) init() error {
closer := trace.InitTracing("data_service")
s.closer = closer
dataservice.Params.Init()
self := sessionutil.NewSession("dataservice", funcutil.GetLocalIP()+":"+strconv.Itoa(Params.Port), true)
sm := sessionutil.NewSessionManager(ctx, dataservice.Params.EtcdAddress, dataservice.Params.MetaRootPath, self)
sm.Init()
sessionutil.SetGlobalSessionManager(sm)
err := s.startGrpc()
if err != nil {
return err
@ -114,7 +122,6 @@ func (s *Server) init() error {
s.dataService.SetMasterClient(masterServiceClient)
}
dataservice.Params.Init()
if err := s.dataService.Init(); err != nil {
log.Error("dataService init error", zap.Error(err))
return err

View File

@ -24,6 +24,7 @@ import (
grpcindexserviceclient "github.com/milvus-io/milvus/internal/distributed/indexservice/client"
"github.com/milvus-io/milvus/internal/indexnode"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
@ -31,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/trace"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
@ -43,6 +45,9 @@ type Server struct {
grpcServer *grpc.Server
grpcErrChan chan error
etcdKV *etcdkv.EtcdKV
signal <-chan bool
indexServiceClient types.IndexService
loopCtx context.Context
loopCancel func()
@ -95,6 +100,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
}
func (s *Server) init() error {
ctx := context.Background()
var err error
Params.Init()
if !funcutil.CheckPortAvailable(Params.Port) {
@ -114,6 +120,11 @@ func (s *Server) init() error {
Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10)
self := sessionutil.NewSession("indexnode", funcutil.GetLocalIP()+":"+strconv.Itoa(Params.Port), false)
sm := sessionutil.NewSessionManager(ctx, indexnode.Params.EtcdAddress, indexnode.Params.MetaRootPath, self)
sm.Init()
sessionutil.SetGlobalSessionManager(sm)
defer func() {
if err != nil {
err = s.Stop()

View File

@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
otgrpc "github.com/opentracing-contrib/go-grpc"
@ -64,12 +65,18 @@ func (s *Server) Run() error {
}
func (s *Server) init() error {
ctx := context.Background()
Params.Init()
indexservice.Params.Init()
closer := trace.InitTracing("index_service")
s.closer = closer
self := sessionutil.NewSession("indexservice", funcutil.GetLocalIP()+":"+strconv.Itoa(Params.ServicePort), true)
sm := sessionutil.NewSessionManager(ctx, indexservice.Params.EtcdAddress, indexservice.Params.MetaRootPath, self)
sm.Init()
sessionutil.SetGlobalSessionManager(sm)
s.loopWg.Add(1)
go s.startGrpcLoop(Params.ServicePort)
// wait for grpc indexservice loop start

View File

@ -856,6 +856,12 @@ func TestRun(t *testing.T) {
}
Params.Port = rand.Int()%100 + 10000
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
cms.Params.Init()
cms.Params.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal)
err = svr.Run()
assert.Nil(t, err)

View File

@ -29,6 +29,7 @@ import (
isc "github.com/milvus-io/milvus/internal/distributed/indexservice/client"
psc "github.com/milvus-io/milvus/internal/distributed/proxyservice/client"
qsc "github.com/milvus-io/milvus/internal/distributed/queryservice/client"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
cms "github.com/milvus-io/milvus/internal/masterservice"
"github.com/milvus-io/milvus/internal/msgstream"
@ -40,6 +41,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/masterpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
)
// grpc wrapper
@ -68,6 +70,9 @@ type Server struct {
connectIndexService bool
connectQueryService bool
etcdKV *etcdkv.EtcdKV
signal <-chan bool
closer io.Closer
}
@ -130,6 +135,11 @@ func (s *Server) init() error {
log.Debug("init params done")
self := sessionutil.NewSession("masterservice", funcutil.GetLocalIP()+":"+strconv.Itoa(Params.Port), true)
sm := sessionutil.NewSessionManager(ctx, cms.Params.EtcdAddress, cms.Params.MetaRootPath, self)
sm.Init()
sessionutil.SetGlobalSessionManager(sm)
err := s.startGrpc()
if err != nil {
return err

View File

@ -39,6 +39,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proxynode"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
)
@ -164,6 +165,11 @@ func (s *Server) init() error {
}
}()
self := sessionutil.NewSession("proxynode", funcutil.GetLocalIP()+":"+strconv.Itoa(Params.Port), false)
sm := sessionutil.NewSessionManager(ctx, proxynode.Params.EtcdAddress, proxynode.Params.MetaRootPath, self)
sm.Init()
sessionutil.SetGlobalSessionManager(sm)
s.wg.Add(1)
go s.startGrpcLoop(Params.Port)
// wait for grpc server loop start

View File

@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
qn "github.com/milvus-io/milvus/internal/querynode"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -91,6 +92,11 @@ func (s *Server) init() error {
closer := trace.InitTracing(fmt.Sprintf("query_node ip: %s, port: %d", Params.QueryNodeIP, Params.QueryNodePort))
s.closer = closer
self := sessionutil.NewSession("querynode", funcutil.GetLocalIP()+":"+strconv.Itoa(Params.QueryNodePort), false)
sm := sessionutil.NewSessionManager(ctx, qn.Params.EtcdAddress, qn.Params.MetaRootPath, self)
sm.Init()
sessionutil.SetGlobalSessionManager(sm)
log.Debug("QueryNode", zap.Int("port", Params.QueryNodePort))
s.wg.Add(1)
go s.startGrpcLoop(Params.QueryNodePort)

View File

@ -27,6 +27,7 @@ import (
qs "github.com/milvus-io/milvus/internal/queryservice"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/trace"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
@ -95,6 +96,11 @@ func (s *Server) init() error {
closer := trace.InitTracing("query_service")
s.closer = closer
self := sessionutil.NewSession("querynode", funcutil.GetLocalIP()+":"+strconv.Itoa(Params.Port), false)
sm := sessionutil.NewSessionManager(ctx, qs.Params.EtcdAddress, qs.Params.MetaRootPath, self)
sm.Init()
sessionutil.SetGlobalSessionManager(sm)
s.wg.Add(1)
go s.startGrpcLoop(Params.Port)
// wait for grpc server loop start

View File

@ -13,9 +13,7 @@ package indexnode
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math/rand"
"time"
@ -23,7 +21,6 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -32,9 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
)
const (
@ -60,13 +55,6 @@ type IndexNode struct {
startCallbacks []func()
closeCallbacks []func()
etcdKV *etcdkv.EtcdKV
session struct {
NodeName string
IP string
LeaseID clientv3.LeaseID
}
closer io.Closer
}
@ -89,32 +77,7 @@ func NewIndexNode(ctx context.Context) (*IndexNode, error) {
func (i *IndexNode) Init() error {
ctx := context.Background()
connectEtcdFn := func() error {
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second})
if err != nil {
return err
}
i.etcdKV = etcdkv.NewEtcdKV(etcdCli, Params.MetaRootPath)
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
if err != nil {
return err
}
ch, err := i.registerService(fmt.Sprintf("indexnode-%d", Params.NodeID), Params.IP)
if err != nil {
return err
}
go func() {
for {
for range ch {
//TODO process lesase response
}
}
}()
err = funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexService", 1000000, time.Millisecond*200)
err := funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexService", 1000000, time.Millisecond*200)
if err != nil {
return err
}
@ -301,32 +264,3 @@ func (i *IndexNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringR
},
}, nil
}
func (i *IndexNode) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := i.etcdKV.Grant(5)
if err != nil {
fmt.Printf("grant error %s\n", err)
return nil, err
}
i.session.NodeName = nodeName
i.session.IP = ip
i.session.LeaseID = respID
sessionJSON, err := json.Marshal(i.session)
if err != nil {
return nil, err
}
err = i.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID)
if err != nil {
fmt.Printf("put lease error %s\n", err)
return nil, err
}
ch, err := i.etcdKV.KeepAlive(respID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err
}
return ch, nil
}

View File

@ -13,9 +13,7 @@ package indexservice
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"sync"
"time"
@ -65,13 +63,6 @@ type IndexService struct {
nodeLock sync.RWMutex
etcdKV *etcdkv.EtcdKV
session struct {
NodeName string
IP string
LeaseID clientv3.LeaseID
}
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()
@ -99,31 +90,19 @@ func (i *IndexService) Init() error {
if err != nil {
return err
}
i.etcdKV = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
metakv, err := NewMetaTable(i.etcdKV)
etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
metakv, err := NewMetaTable(etcdKV)
if err != nil {
return err
}
i.metaTable = metakv
return nil
return err
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
if err != nil {
return err
}
ch, err := i.registerService("indexservice", Params.Address)
if err != nil {
return err
}
go func() {
for {
for range ch {
//TODO process lesase response
}
}
}()
//init idAllocator
kvRootPath := Params.KvRootPath
i.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, kvRootPath, "index_gid"))
@ -436,32 +415,3 @@ func (i *IndexService) dropIndexLoop() {
}
}
}
func (i *IndexService) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := i.etcdKV.Grant(5)
if err != nil {
fmt.Printf("grant error %s\n", err)
return nil, err
}
i.session.NodeName = nodeName
i.session.IP = ip
i.session.LeaseID = respID
sessionJSON, err := json.Marshal(i.session)
if err != nil {
return nil, err
}
err = i.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID)
if err != nil {
fmt.Printf("put lease error %s\n", err)
return nil, err
}
ch, err := i.etcdKV.KeepAlive(respID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err
}
return ch, nil
}

View File

@ -145,12 +145,6 @@ type Core struct {
//isInit atomic.Value
msFactory ms.Factory
session struct {
NodeName string
IP string
LeaseID clientv3.LeaseID
}
}
// --------------------- function --------------------------
@ -844,19 +838,6 @@ func (c *Core) Init() error {
return
}
ch, err := c.registerService("masterservice", "localhost")
if err != nil {
return
}
go func() {
for {
for range ch {
//TODO process lesase response
}
}
}()
idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "gid"))
if initError = idAllocator.Initialize(); initError != nil {
return
@ -1649,34 +1630,3 @@ func (c *Core) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*maste
Count: in.Count,
}, nil
}
func (c *Core) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := c.etcdCli.Grant(c.ctx, 5)
if err != nil {
fmt.Printf("grant error %s\n", err)
return nil, err
}
c.session.NodeName = nodeName
c.session.IP = ip
c.session.LeaseID = respID.ID
sessionJSON, err := json.Marshal(c.session)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(c.ctx, RequestTimeout)
defer cancel()
_, err = c.etcdCli.Put(ctx, fmt.Sprintf("%s/node/%s", Params.MetaRootPath, nodeName), string(sessionJSON), clientv3.WithLease(respID.ID))
if err != nil {
fmt.Printf("put lease error %s\n", err)
return nil, err
}
ch, err := c.etcdCli.KeepAlive(c.ctx, respID.ID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err
}
return ch, nil
}

View File

@ -13,9 +13,7 @@ package proxynode
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
@ -24,7 +22,6 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/allocator"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -32,9 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
)
type UniqueID = typeutil.UniqueID
@ -67,13 +62,6 @@ type ProxyNode struct {
queryMsgStream msgstream.MsgStream
msFactory msgstream.Factory
etcdKV *etcdkv.EtcdKV
session struct {
NodeName string
IP string
LeaseID clientv3.LeaseID
}
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()
@ -98,35 +86,7 @@ func (node *ProxyNode) Init() error {
// todo wait for proxyservice state changed to Healthy
ctx := context.Background()
connectEtcdFn := func() error {
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
if err != nil {
return err
}
node.etcdKV = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
if err != nil {
return err
}
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
if err != nil {
return err
}
ch, err := node.registerService("proxynode", Params.NetworkAddress)
if err != nil {
return err
}
go func() {
for {
for range ch {
//TODO process lesase response
}
}
}()
err = funcutil.WaitForComponentHealthy(ctx, node.proxyService, "ProxyService", 1000000, time.Millisecond*200)
err := funcutil.WaitForComponentHealthy(ctx, node.proxyService, "ProxyService", 1000000, time.Millisecond*200)
if err != nil {
return err
}
@ -336,32 +296,3 @@ func (node *ProxyNode) SetProxyServiceClient(cli types.ProxyService) {
func (node *ProxyNode) SetQueryServiceClient(cli types.QueryService) {
node.queryService = cli
}
func (node *ProxyNode) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := node.etcdKV.Grant(5)
if err != nil {
fmt.Printf("grant error %s\n", err)
return nil, err
}
node.session.NodeName = nodeName
node.session.IP = ip
node.session.LeaseID = respID
sessionJSON, err := json.Marshal(node.session)
if err != nil {
return nil, err
}
err = node.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID)
if err != nil {
fmt.Printf("put lease error %s\n", err)
return nil, err
}
ch, err := node.etcdKV.KeepAlive(respID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err
}
return ch, nil
}

View File

@ -26,7 +26,6 @@ import "C"
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
@ -36,15 +35,12 @@ import (
"go.uber.org/zap"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
"go.etcd.io/etcd/clientv3"
)
type QueryNode struct {
@ -70,13 +66,6 @@ type QueryNode struct {
indexService types.IndexService
dataService types.DataService
etcdKV *etcdkv.EtcdKV
session struct {
NodeName string
IP string
LeaseID clientv3.LeaseID
}
msFactory msgstream.Factory
scheduler *taskScheduler
}
@ -127,31 +116,6 @@ func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *Quer
func (node *QueryNode) Init() error {
ctx := context.Background()
connectEtcdFn := func() error {
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second})
if err != nil {
return err
}
node.etcdKV = etcdkv.NewEtcdKV(etcdCli, Params.MetaRootPath)
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
if err != nil {
return err
}
ch, err := node.registerService(fmt.Sprintf("querynode-%d", Params.QueryNodeID), Params.QueryNodeIP)
if err != nil {
return err
}
go func() {
for {
for range ch {
//TODO process lesase response
}
}
}()
C.SegcoreInit()
registerReq := &queryPb.RegisterNodeRequest{
Base: &commonpb.MsgBase{
@ -316,32 +280,3 @@ func (node *QueryNode) removeDataSyncService(collectionID UniqueID) {
defer node.dsServicesMu.Unlock()
delete(node.dataSyncServices, collectionID)
}
func (node *QueryNode) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := node.etcdKV.Grant(5)
if err != nil {
fmt.Printf("grant error %s\n", err)
return nil, err
}
node.session.NodeName = nodeName
node.session.IP = ip
node.session.LeaseID = respID
sessionJSON, err := json.Marshal(node.session)
if err != nil {
return nil, err
}
err = node.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID)
if err != nil {
fmt.Printf("put lease error %s\n", err)
return nil, err
}
ch, err := node.etcdKV.KeepAlive(respID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err
}
return ch, nil
}

View File

@ -13,21 +13,16 @@ package queryservice
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
)
type Timestamp = typeutil.Timestamp
@ -55,42 +50,10 @@ type QueryService struct {
isInit atomic.Value
enableGrpc bool
etcdKV *etcdkv.EtcdKV
session struct {
NodeName string
IP string
LeaseID clientv3.LeaseID
}
msFactory msgstream.Factory
}
func (qs *QueryService) Init() error {
connectEtcdFn := func() error {
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second})
if err != nil {
return err
}
qs.etcdKV = etcdkv.NewEtcdKV(etcdCli, Params.MetaRootPath)
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
if err != nil {
return err
}
ch, err := qs.registerService(fmt.Sprintf("queryservice-%d", Params.QueryServiceID), Params.Address)
if err != nil {
return err
}
go func() {
for {
for range ch {
//TODO process lesase response
}
}
}()
return nil
}
@ -142,32 +105,3 @@ func (qs *QueryService) SetMasterService(masterService types.MasterService) {
func (qs *QueryService) SetDataService(dataService types.DataService) {
qs.dataServiceClient = dataService
}
func (qs *QueryService) registerService(nodeName string, ip string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := qs.etcdKV.Grant(5)
if err != nil {
fmt.Printf("grant error %s\n", err)
return nil, err
}
qs.session.NodeName = nodeName
qs.session.IP = ip
qs.session.LeaseID = respID
sessionJSON, err := json.Marshal(qs.session)
if err != nil {
return nil, err
}
err = qs.etcdKV.SaveWithLease(fmt.Sprintf("/node/%s", nodeName), string(sessionJSON), respID)
if err != nil {
fmt.Printf("put lease error %s\n", err)
return nil, err
}
ch, err := qs.etcdKV.KeepAlive(respID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err
}
return ch, nil
}

View File

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"strconv"
"sync"
"time"
"github.com/coreos/etcd/mvcc/mvccpb"
@ -19,6 +20,7 @@ import (
const defaultServiceRoot = "/services/"
const defaultIDKey = "id"
const defaultRetryTimes = 30
const defaultTTL = 10
// Session is a struct to store service's session, including ServerID, ServerName,
// Address.
@ -27,54 +29,103 @@ type Session struct {
ServerID int64
ServerName string
Address string
Exclusive bool
LeaseID clientv3.LeaseID
}
var (
globalServerID = int64(-1)
globalSessionManager = &SessionManager{}
)
// SessionManager is a struct to help store other service's session.
// including ServerID, ServerName, Address.
// It can fetch up-to-date sessions' information and watch service up and down.
type SessionManager struct {
ctx context.Context
etcdKV *etcdkv.EtcdKV
Self *Session
Sessions sync.Map
}
// NewSession is a helper to build Session object.LeaseID will be assigned after
// registeration.
func NewSession(serverID int64, serverName, address string) *Session {
func NewSession(serverName, address string, exclusive bool) *Session {
return &Session{
ServerName: serverName,
Address: address,
Exclusive: exclusive,
}
}
// NewSessionManager is a helper to build SessionManager object.
func NewSessionManager(ctx context.Context, etcdAddress string, etcdPath string, self *Session) *SessionManager {
etcdKV, err := initEtcd(etcdAddress, etcdPath)
if err != nil {
return nil
}
return &SessionManager{
ctx: ctx,
etcdKV: etcdKV,
Self: self,
}
}
// Init will initialize base struct in the SessionManager, including getServerID,
// and process keepAliveResponse
func (sm *SessionManager) Init() {
sm.checkIDExist()
serverID, err := sm.getServerID()
if err != nil {
panic(err)
}
sm.Self.ServerID = serverID
ch, err := sm.registerService()
if err != nil {
panic(err)
}
sm.processKeepAliveResponse(ch)
}
// NewSession is a helper to build Session object.LeaseID will be assigned after
// registeration.
func NewSessionWithID(serverID int64, serverName, address string, exclusive bool) *Session {
return &Session{
ServerID: serverID,
ServerName: serverName,
Address: address,
Exclusive: exclusive,
}
}
// GlobalServerID returns [singleton] ServerID.
// Before SetGlobalServerID, GlobalServerID() returns -1
func GlobalServerID() int64 {
return globalServerID
func GlobalSessionManager() *SessionManager {
return globalSessionManager
}
// SetGlobalServerID sets the [singleton] ServerID. ServerID returned by
// GlobalServerID(). Those who use GlobalServerID should call SetGlobalServerID()
// as early as possible in main() before use ServerID.
func SetGlobalServerID(id int64) {
globalServerID = id
func SetGlobalSessionManager(sm *SessionManager) {
globalSessionManager = sm
}
// GetServerID gets id from etcd with key: metaRootPath + "/services/id"
// Each server get ServerID and add one to id.
func GetServerID(etcd *etcdkv.EtcdKV) (int64, error) {
return getServerIDWithKey(etcd, defaultIDKey, defaultRetryTimes)
func (sm *SessionManager) getServerID() (int64, error) {
return sm.getServerIDWithKey(defaultIDKey, defaultRetryTimes)
}
func getServerIDWithKey(etcd *etcdkv.EtcdKV, key string, retryTimes int) (int64, error) {
res := int64(-1)
func (sm *SessionManager) checkIDExist() {
sm.etcdKV.CompareVersionAndSwap(defaultServiceRoot+defaultIDKey, 0, "1")
}
func (sm *SessionManager) getServerIDWithKey(key string, retryTimes int) (int64, error) {
res := int64(0)
getServerIDWithKeyFn := func() error {
value, err := etcd.Load(defaultServiceRoot + key)
log.Debug("session", zap.String("get serverid", value))
value, err := sm.etcdKV.Load(defaultServiceRoot + key)
if err != nil {
err = etcd.CompareVersionAndSwap(defaultServiceRoot+key, 0, "1")
if err != nil {
log.Debug("session", zap.Error(err))
return err
}
res = 0
return nil
}
valueInt, err := strconv.ParseInt(value, 10, 64)
@ -82,7 +133,7 @@ func getServerIDWithKey(etcd *etcdkv.EtcdKV, key string, retryTimes int) (int64,
log.Debug("session", zap.Error(err))
return err
}
err = etcd.CompareValueAndSwap(defaultServiceRoot+key, value,
err = sm.etcdKV.CompareValueAndSwap(defaultServiceRoot+key, value,
strconv.FormatInt(valueInt+1, 10))
if err != nil {
log.Debug("session", zap.Error(err))
@ -110,30 +161,30 @@ func getServerIDWithKey(etcd *etcdkv.EtcdKV, key string, retryTimes int) (int64,
// MetaRootPath is configurable in the config file.
// Exclusive means whether this service can exist two at the same time, if so,
// it is false. Otherwise, set it to true.
func RegisterService(etcdKV *etcdkv.EtcdKV, exclusive bool, session *Session, ttl int64) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := etcdKV.Grant(ttl)
func (sm *SessionManager) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := sm.etcdKV.Grant(defaultTTL)
if err != nil {
log.Error("register service", zap.Error(err))
return nil, err
}
session.LeaseID = respID
sm.Self.LeaseID = respID
sessionJSON, err := json.Marshal(session)
sessionJSON, err := json.Marshal(sm.Self)
if err != nil {
return nil, err
}
key := defaultServiceRoot + session.ServerName
if !exclusive {
key = key + "-" + strconv.FormatInt(session.ServerID, 10)
key := defaultServiceRoot + sm.Self.ServerName
if !sm.Self.Exclusive {
key = key + "-" + strconv.FormatInt(sm.Self.ServerID, 10)
}
err = etcdKV.CompareVersionAndSwap(key, 0, string(sessionJSON), clientv3.WithLease(respID))
err = sm.etcdKV.CompareVersionAndSwap(key, 0, string(sessionJSON), clientv3.WithLease(respID))
if err != nil {
fmt.Printf("compare and swap error %s\n. maybe the key has registered", err)
return nil, err
}
ch, err := etcdKV.KeepAlive(respID)
ch, err := sm.etcdKV.KeepAlive(respID)
if err != nil {
fmt.Printf("keep alive error %s\n", err)
return nil, err
@ -143,56 +194,60 @@ func RegisterService(etcdKV *etcdkv.EtcdKV, exclusive bool, session *Session, tt
// ProcessKeepAliveResponse processes the response of etcd keepAlive interface
// If keepAlive fails for unexpected error, it will send a signal to the channel.
func ProcessKeepAliveResponse(ctx context.Context, ch <-chan *clientv3.LeaseKeepAliveResponse) (signal <-chan bool) {
signalOut := make(chan bool)
func (sm *SessionManager) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveResponse) {
go func() {
for {
select {
case <-ctx.Done():
case <-sm.ctx.Done():
log.Error("keep alive", zap.Error(errors.New("context done")))
return
case resp, ok := <-ch:
if !ok {
signalOut <- false
panic("keepAlive with etcd failed")
}
if resp != nil {
signalOut <- true
} else {
signalOut <- false
if resp == nil {
panic("keepAlive with etcd failed")
}
}
}
}()
return signalOut
}
// GetSessions gets all the services registered in etcd.
// This gets all the key with prefix metaRootPath + "/services/" + prefix
// For general, "datanode" to get all datanodes
func GetSessions(etcdKV *etcdkv.EtcdKV, prefix string) ([]*Session, error) {
sessions := make([]*Session, 0)
_, resValue, err := etcdKV.LoadWithPrefix(defaultServiceRoot + prefix)
// UpdateSessions will update local sessions same as the sessions saved in etcd.
// It makes locally stored sessions up-to-date.
func (sm *SessionManager) UpdateSessions(prefix string) error {
resKey, resValue, err := sm.etcdKV.LoadWithPrefix(defaultServiceRoot + prefix)
if err != nil {
return nil, err
return err
}
for _, value := range resValue {
for i := 0; i < len(resKey); i++ {
session := &Session{}
err = json.Unmarshal([]byte(value), session)
err = json.Unmarshal([]byte(resValue[i]), session)
if err != nil {
return nil, err
return err
}
sessions = append(sessions, session)
sm.Sessions.Store(resKey[i], session)
}
return sessions, nil
return nil
}
// WatchServices watch all events in etcd.
// If a server register, a session will be sent to addChannel
// If a server offline, a session will be sent to deleteChannel
func WatchServices(ctx context.Context, etcdKV *etcdkv.EtcdKV, prefix string) (addChannel <-chan *Session, deleteChannel <-chan *Session) {
addCh := make(chan *Session, 10)
deleteCh := make(chan *Session, 10)
rch := etcdKV.WatchWithPrefix(defaultServiceRoot + prefix)
// GetSessions gets all the services saved in memory.
// Before GetSessions, you should WatchServices or UpdateSessions first.
func (sm *SessionManager) GetSessions() map[string]*Session {
sessions := map[string]*Session{}
sm.Sessions.Range(func(key, value interface{}) bool {
sessions[fmt.Sprint(key)] = value.(*Session)
return true
})
return sessions
}
// WatchServices watch the service's up and down in etcd, and saves it into local
// sessions. If a server up, it will be add to sessions. But it won't get the
// sessions startup before watch start.
// UpdateSessions and WatchServices is recommended.
func (sm *SessionManager) WatchServices(ctx context.Context, prefix string) {
rch := sm.etcdKV.WatchWithPrefix(defaultServiceRoot + prefix)
go func() {
for {
select {
@ -203,26 +258,42 @@ func WatchServices(ctx context.Context, etcdKV *etcdkv.EtcdKV, prefix string) (a
return
}
for _, ev := range wresp.Events {
session := &Session{}
err := json.Unmarshal([]byte(ev.Kv.Value), session)
if err != nil {
log.Error("watch services", zap.Error(err))
continue
}
switch ev.Type {
case mvccpb.PUT:
log.Debug("watch services",
zap.Any("addchannel kv", ev.Kv))
addCh <- session
zap.Any("add kv", ev.Kv))
session := &Session{}
err := json.Unmarshal([]byte(ev.Kv.Value), session)
if err != nil {
log.Error("watch services", zap.Error(err))
continue
}
sm.Sessions.Store(string(ev.Kv.Key), session)
case mvccpb.DELETE:
log.Debug("watch services",
zap.Any("deletechannel kv", ev.Kv))
deleteCh <- session
zap.Any("delete kv", ev.Kv))
sm.Sessions.Delete(string(ev.Kv.Key))
}
}
}
}
}()
return addCh, deleteCh
}
func initEtcd(etcdAddress, rootPath string) (*etcdkv.EtcdKV, error) {
var etcdKV *etcdkv.EtcdKV
connectEtcdFn := func() error {
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}, DialTimeout: 5 * time.Second})
if err != nil {
return err
}
etcdKV = etcdkv.NewEtcdKV(etcdCli, rootPath)
return nil
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
if err != nil {
return nil, err
}
return etcdKV, nil
}

View File

@ -2,8 +2,10 @@ package sessionutil
import (
"fmt"
"math/rand"
"sync"
"testing"
"time"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/paramtable"
@ -14,7 +16,10 @@ import (
var Params paramtable.BaseTable
func TestGetServerID(t *testing.T) {
func TestGetServerIDConcurrently(t *testing.T) {
ctx := context.Background()
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
etcdAddr, err := Params.Load("_EtcdAddress")
@ -24,7 +29,7 @@ func TestGetServerID(t *testing.T) {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
rootPath := "/etcd/test/root"
rootPath := fmt.Sprintf("/%d/test/meta", randVal)
etcdKV := etcdkv.NewEtcdKV(cli, rootPath)
defer etcdKV.Close()
@ -33,10 +38,13 @@ func TestGetServerID(t *testing.T) {
var wg sync.WaitGroup
var muList sync.Mutex = sync.Mutex{}
self := NewSession("test", "testAddr", false)
sm := NewSessionManager(ctx, etcdAddr, rootPath, self)
res := make([]int64, 0)
getIDFunc := func() {
id, err := GetServerID(etcdKV)
sm.checkIDExist()
id, err := sm.getServerID()
assert.Nil(t, err)
muList.Lock()
res = append(res, id)
@ -49,16 +57,17 @@ func TestGetServerID(t *testing.T) {
go getIDFunc()
}
wg.Wait()
for i := 0; i < 10; i++ {
for i := 1; i <= 10; i++ {
assert.Contains(t, res, int64(i))
}
}
func TestRegister(t *testing.T) {
func TestInit(t *testing.T) {
ctx := context.Background()
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
etcdAddr, err := Params.Load("_EtcdAddress")
if err != nil {
@ -67,94 +76,75 @@ func TestRegister(t *testing.T) {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
rootPath := "/etcd/test/root"
rootPath := fmt.Sprintf("/%d/test/meta", randVal)
etcdKV := etcdkv.NewEtcdKV(cli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
addChannel, deletechannel := WatchServices(ctx, etcdKV, "test")
for i := 0; i < 10; i++ {
id, err := GetServerID(etcdKV)
assert.Nil(t, err)
session := NewSession(id, "test", "localhost")
_, err = RegisterService(etcdKV, false, session, 10)
assert.Nil(t, err)
sessionReturn := <-addChannel
assert.Equal(t, sessionReturn, session)
self := NewSession("test", "testAddr", false)
sm := NewSessionManager(ctx, etcdAddr, rootPath, self)
sm.Init()
assert.NotEqual(t, 0, sm.Self.LeaseID)
assert.NotEqual(t, 0, sm.Self.ServerID)
}
func TestUpdateSessions(t *testing.T) {
ctx := context.Background()
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
etcdAddr, err := Params.Load("_EtcdAddress")
if err != nil {
panic(err)
}
sessions, err := GetSessions(etcdKV, "test")
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
rootPath := fmt.Sprintf("/%d/test/meta", randVal)
etcdKV := etcdkv.NewEtcdKV(cli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
var wg sync.WaitGroup
var muList sync.Mutex = sync.Mutex{}
self := NewSession("test", "testAddr", false)
sm := NewSessionManager(ctx, etcdAddr, rootPath, self)
sm.WatchServices(ctx, "test")
err = sm.UpdateSessions("test")
assert.Nil(t, err)
sessionManagers := make([]*SessionManager, 0)
getIDFunc := func() {
service := NewSession("test", "testAddr", false)
singleManager := NewSessionManager(ctx, etcdAddr, rootPath, service)
singleManager.Init()
muList.Lock()
sessionManagers = append(sessionManagers, singleManager)
muList.Unlock()
wg.Done()
}
for i := 0; i < 10; i++ {
wg.Add(1)
go getIDFunc()
}
wg.Wait()
assert.Equal(t, len(sm.GetSessions()), 10)
sessions := sm.GetSessions()
assert.Nil(t, err)
assert.Equal(t, len(sessions), 10)
for i := 10; i < 10; i++ {
assert.Equal(t, sessions[i].ServerID, int64(i))
err = etcdKV.Remove(fmt.Sprintf("test-%d", i))
assert.Nil(t, err)
sessionReturn := <-deletechannel
assert.Equal(t, sessionReturn, sessions[i])
}
}
func TestRegisterExclusive(t *testing.T) {
Params.Init()
etcdAddr, err := Params.Load("_EtcdAddress")
if err != nil {
panic(err)
}
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
rootPath := "/etcd/test/root"
etcdKV := etcdkv.NewEtcdKV(cli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
id, err := GetServerID(etcdKV)
assert.Nil(t, err)
session := NewSession(id, "test", "localhost")
_, err = RegisterService(etcdKV, true, session, 10)
assert.Nil(t, err)
id, err = GetServerID(etcdKV)
assert.Nil(t, err)
session = NewSession(id, "test", "helloworld")
_, err = RegisterService(etcdKV, true, session, 10)
assert.NotNil(t, err)
}
func TestKeepAlive(t *testing.T) {
Params.Init()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
etcdAddr, err := Params.Load("_EtcdAddress")
if err != nil {
panic(err)
}
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
rootPath := "/etcd/test/root"
etcdKV := etcdkv.NewEtcdKV(cli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
id, err := GetServerID(etcdKV)
assert.Nil(t, err)
session := NewSession(id, "test", "localhost")
ch, err := RegisterService(etcdKV, false, session, 10)
assert.Nil(t, err)
aliveCh := ProcessKeepAliveResponse(ctx, ch)
signal := <-aliveCh
assert.Equal(t, signal, true)
sessions, err := GetSessions(etcdKV, "test")
assert.Nil(t, err)
assert.Equal(t, len(sessions), 1)
assert.Equal(t, sessions[0].ServerID, int64(0))
etcdKV.RemoveWithPrefix("")
assert.Eventually(t, func() bool {
return len(sm.GetSessions()) == 0
}, 10*time.Second, 100*time.Millisecond)
}