Improve session manager (#5310)

Improve session manager.
Issue #5174

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/5323/head
godchen 2021-05-20 15:07:25 +08:00 committed by GitHub
parent f18dfb4ff6
commit 913712fbea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 11 deletions

View File

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
@ -233,10 +234,12 @@ func (sm *SessionManager) UpdateSessions(prefix string) error {
// GetSessions gets all the services saved in memory.
// Before GetSessions, you should WatchServices or UpdateSessions first.
func (sm *SessionManager) GetSessions() map[string]*Session {
func (sm *SessionManager) GetSessions(prefix string) map[string]*Session {
sessions := map[string]*Session{}
sm.Sessions.Range(func(key, value interface{}) bool {
sessions[fmt.Sprint(key)] = value.(*Session)
if strings.Contains(fmt.Sprint(key), prefix) {
sessions[fmt.Sprint(key)] = value.(*Session)
}
return true
})
return sessions
@ -246,7 +249,9 @@ func (sm *SessionManager) GetSessions() map[string]*Session {
// sessions. If a server up, it will be add to sessions. But it won't get the
// sessions startup before watch start.
// UpdateSessions and WatchServices is recommended.
func (sm *SessionManager) WatchServices(ctx context.Context, prefix string) {
func (sm *SessionManager) WatchServices(ctx context.Context, prefix string) (addChannel <-chan *Session, delChannel <-chan *Session) {
addCh := make(chan *Session, 10)
delCh := make(chan *Session, 10)
rch := sm.etcdKV.WatchWithPrefix(defaultServiceRoot + prefix)
go func() {
for {
@ -269,16 +274,21 @@ func (sm *SessionManager) WatchServices(ctx context.Context, prefix string) {
continue
}
sm.Sessions.Store(string(ev.Kv.Key), session)
addCh <- session
case mvccpb.DELETE:
log.Debug("watch services",
zap.Any("delete kv", ev.Kv))
sm.Sessions.Delete(string(ev.Kv.Key))
value, isloaded := sm.Sessions.LoadAndDelete(string(ev.Kv.Key))
if isloaded {
delCh <- value.(*Session)
}
}
}
}
}
}()
return addCh, delCh
}
func initEtcd(etcdAddress, rootPath string) (*etcdkv.EtcdKV, error) {

View File

@ -113,10 +113,10 @@ func TestUpdateSessions(t *testing.T) {
self := NewSession("test", "testAddr", false)
sm := NewSessionManager(ctx, etcdAddr, rootPath, self)
sm.WatchServices(ctx, "test")
err = sm.UpdateSessions("test")
assert.Nil(t, err)
addCh, delCh := sm.WatchServices(ctx, "test")
sessionManagers := make([]*SessionManager, 0)
@ -136,15 +136,27 @@ func TestUpdateSessions(t *testing.T) {
}
wg.Wait()
assert.Equal(t, len(sm.GetSessions()), 10)
sessions := sm.GetSessions()
assert.Nil(t, err)
assert.Equal(t, len(sessions), 10)
assert.Eventually(t, func() bool {
return len(sm.GetSessions("test")) == 10
}, 10*time.Second, 100*time.Millisecond)
assert.Equal(t, len(sm.GetSessions("testt")), 0)
etcdKV.RemoveWithPrefix("")
assert.Eventually(t, func() bool {
return len(sm.GetSessions()) == 0
return len(sm.GetSessions("test")) == 0
}, 10*time.Second, 100*time.Millisecond)
addSessions := []*Session{}
for i := 0; i < 10; i++ {
session := <-addCh
addSessions = append(addSessions, session)
}
assert.Equal(t, len(addSessions), 10)
delSessions := []*Session{}
for i := 0; i < 10; i++ {
session := <-delCh
delSessions = append(delSessions, session)
}
assert.Equal(t, len(addSessions), 10)
}