Fix GetSession error (#5401)

Fix GetSession error

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/5411/head
godchen 2021-05-25 20:15:46 +08:00 committed by GitHub
parent 6766169878
commit 200801271d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 66 additions and 37 deletions

View File

@ -50,7 +50,7 @@ type Client struct {
func getDataNodeAddress(sess *sessionutil.Session, serverID int64) (string, error) {
key := typeutil.DataNodeRole + "-" + strconv.FormatInt(serverID, 10)
msess, err := sess.GetSessions(key)
msess, _, err := sess.GetSessions(key)
if err != nil {
return "", err
}

View File

@ -43,7 +43,7 @@ type GrpcClient struct {
func getMasterServiceAddr(sess *sessionutil.Session) (string, error) {
key := typeutil.MasterServiceRole
msess, err := sess.GetSessions(key)
msess, _, err := sess.GetSessions(key)
if err != nil {
return "", err
}

View File

@ -16,10 +16,20 @@ import (
"go.uber.org/zap"
)
const DefaultServiceRoot = "/session/"
const DefaultIDKey = "id"
const DefaultRetryTimes = 30
const DefaultTTL = 10
const (
DefaultServiceRoot = "/session/"
DefaultIDKey = "id"
DefaultRetryTimes = 30
DefaultTTL = 10
)
type SessionEventType int
const (
SessionNoneEvent SessionEventType = iota
SessionAddEvent
SessionDelEvent
)
// Session is a struct to store service's session, including ServerID, ServerName,
// Address.
@ -36,6 +46,11 @@ type Session struct {
cancel context.CancelFunc
}
type SessionEvent struct {
EventType SessionEventType
Session *Session
}
// NewSession is a helper to build Session object.
// ServerID and LeaseID will be assigned after registeration.
// etcdCli is initialized when NewSession
@ -222,33 +237,33 @@ func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveRes
}
// GetSessions will get all sessions registered in etcd.
func (s *Session) GetSessions(prefix string) (map[string]*Session, error) {
func (s *Session) GetSessions(prefix string) (map[string]*Session, int64, error) {
res := make(map[string]*Session)
key := path.Join(DefaultServiceRoot, prefix)
resp, err := s.etcdCli.Get(s.ctx, key, clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, err
return nil, 0, err
}
for _, kv := range resp.Kvs {
session := &Session{}
err = json.Unmarshal([]byte(kv.Value), session)
if err != nil {
return nil, err
return nil, 0, err
}
res[string(kv.Key)] = session
_, mapKey := path.Split(string(kv.Key))
res[mapKey] = session
}
return res, nil
return res, resp.Header.Revision, nil
}
// WatchServices watch the service's up and down in etcd, and saves it into local
// sessions.
// If a server up, it will be add to addChannel.
// If a server is offline, it will be add to delChannel.
func (s *Session) WatchServices(prefix string) (addChannel <-chan *Session, delChannel <-chan *Session) {
addCh := make(chan *Session, 10)
delCh := make(chan *Session, 10)
rch := s.etcdCli.Watch(s.ctx, path.Join(DefaultServiceRoot, prefix), clientv3.WithPrefix(), clientv3.WithPrevKV())
func (s *Session) WatchServices(prefix string, revision int64) (eventChannel <-chan *SessionEvent) {
eventCh := make(chan *SessionEvent, 100)
rch := s.etcdCli.Watch(s.ctx, path.Join(DefaultServiceRoot, prefix), clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision))
go func() {
for {
select {
@ -259,6 +274,8 @@ func (s *Session) WatchServices(prefix string) (addChannel <-chan *Session, delC
return
}
for _, ev := range wresp.Events {
session := &Session{}
var eventType SessionEventType
switch ev.Type {
case mvccpb.PUT:
log.Debug("watch services",
@ -269,24 +286,27 @@ func (s *Session) WatchServices(prefix string) (addChannel <-chan *Session, delC
log.Error("watch services", zap.Error(err))
continue
}
addCh <- session
eventType = SessionAddEvent
case mvccpb.DELETE:
log.Debug("watch services",
zap.Any("delete kv", ev.PrevKv))
session := &Session{}
err := json.Unmarshal([]byte(ev.PrevKv.Value), session)
if err != nil {
log.Error("watch services", zap.Error(err))
continue
}
delCh <- session
eventType = SessionDelEvent
}
eventCh <- &SessionEvent{
EventType: eventType,
Session: session,
}
}
}
}
}()
return addCh, delCh
return eventCh
}
func initEtcd(etcdAddress string) (*clientv3.Client, error) {

View File

@ -1,6 +1,7 @@
package sessionutil
import (
"strconv"
"sync"
"testing"
"time"
@ -78,9 +79,12 @@ func TestInit(t *testing.T) {
defer etcdKV.RemoveWithPrefix("")
s := NewSession(ctx, []string{etcdAddr})
s.Init("test", "testAddr", false)
s.Init("inittest", "testAddr", false)
assert.NotEqual(t, int64(0), s.leaseID)
assert.NotEqual(t, int64(0), s.ServerID)
sessions, _, err := s.GetSessions("inittest")
assert.Nil(t, err)
assert.Contains(t, sessions, "inittest-"+strconv.FormatInt(s.ServerID, 10))
}
func TestUpdateSessions(t *testing.T) {
@ -106,10 +110,10 @@ func TestUpdateSessions(t *testing.T) {
s := NewSession(ctx, []string{etcdAddr})
sessions, err := s.GetSessions("test")
sessions, rev, err := s.GetSessions("test")
assert.Nil(t, err)
assert.Equal(t, len(sessions), 0)
addCh, delCh := s.WatchServices("test")
eventCh := s.WatchServices("test", rev)
sList := []*Session{}
@ -129,29 +133,34 @@ func TestUpdateSessions(t *testing.T) {
wg.Wait()
assert.Eventually(t, func() bool {
sessions, _ := s.GetSessions("test")
sessions, _, _ := s.GetSessions("test")
return len(sessions) == 10
}, 10*time.Second, 100*time.Millisecond)
notExistSessions, _ := s.GetSessions("testt")
notExistSessions, _, _ := s.GetSessions("testt")
assert.Equal(t, len(notExistSessions), 0)
etcdKV.RemoveWithPrefix("")
etcdKV.RemoveWithPrefix(DefaultServiceRoot)
assert.Eventually(t, func() bool {
sessions, _ := s.GetSessions("test")
sessions, _, _ := s.GetSessions("test")
return len(sessions) == 0
}, 10*time.Second, 100*time.Millisecond)
addSessions := []*Session{}
for i := 0; i < 10; i++ {
session := <-addCh
addSessions = append(addSessions, session)
sessionEvents := []*SessionEvent{}
addEventLen := 0
delEventLen := 0
eventLength := len(eventCh)
for i := 0; i < eventLength; i++ {
sessionEvent := <-eventCh
if sessionEvent.EventType == SessionAddEvent {
addEventLen++
}
if sessionEvent.EventType == SessionDelEvent {
delEventLen++
}
sessionEvents = append(sessionEvents, sessionEvent)
}
assert.Equal(t, len(addSessions), 10)
assert.Equal(t, len(sessionEvents), 20)
assert.Equal(t, addEventLen, 10)
assert.Equal(t, delEventLen, 10)
delSessions := []*Session{}
for i := 0; i < 10; i++ {
session := <-delCh
delSessions = append(delSessions, session)
}
assert.Equal(t, len(addSessions), 10)
}