Refactor session WatchSessions to allow rewatch when Rev Compacted (#12880)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/12942/head
congqixia 2021-12-08 10:11:04 +08:00 committed by GitHub
parent 03c27b17e2
commit de454956fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 256 additions and 54 deletions

View File

@ -370,7 +370,7 @@ func (s *Server) initServiceDiscovery() error {
s.cluster.Startup(datanodes)
s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1)
s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1, nil)
return nil
}

View File

@ -115,7 +115,7 @@ func (cm *ConnectionManager) AddDependency(roleName string) error {
}
}
eventChannel := cm.session.WatchServices(roleName, rev)
eventChannel := cm.session.WatchServices(roleName, rev, nil)
go cm.processEvent(eventChannel)
return nil

View File

@ -173,7 +173,7 @@ func (i *IndexCoord) Init() error {
}
log.Debug("IndexCoord", zap.Int("IndexNode number", len(i.nodeManager.nodeClients)))
i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1)
i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1, nil)
nodeTasks := i.metaTable.GetNodeTaskStats()
for nodeID, taskNum := range nodeTasks {
i.nodeManager.pq.UpdatePriority(nodeID, taskNum)

View File

@ -331,7 +331,7 @@ func (qc *QueryCoord) watchNodeLoop() {
log.Debug("start a loadBalance task", zap.Any("task", loadBalanceTask))
}
qc.eventChan = qc.session.WatchServices(typeutil.QueryNodeRole, qc.cluster.getSessionVersion()+1)
qc.eventChan = qc.session.WatchServices(typeutil.QueryNodeRole, qc.cluster.getSessionVersion()+1, nil)
for {
select {
case <-ctx.Done():

View File

@ -13,6 +13,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/retry"
"go.etcd.io/etcd/api/v3/mvccpb"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
@ -31,6 +32,11 @@ const (
// SessionEventType session event type
type SessionEventType int
// Rewatch defines the behavior outer session watch handles ErrCompacted
// it should process the current full list of session
// and returns err if meta error or anything else goes wrong
type Rewatch func(sessions map[string]*Session) error
const (
// SessionNoneEvent place holder for zero value
SessionNoneEvent SessionEventType = iota
@ -304,6 +310,36 @@ type SessionEvent struct {
Session *Session
}
type sessionWatcher struct {
s *Session
rch clientv3.WatchChan
eventCh chan *SessionEvent
prefix string
rewatch Rewatch
}
func (w *sessionWatcher) start() {
go func() {
for {
select {
case <-w.s.ctx.Done():
return
case wresp, ok := <-w.rch:
if !ok {
return
}
err := w.handleWatchResponse(wresp)
// internal error not handled,goroutine quit
if err != nil {
log.Warn("watch goroutine found error", zap.Error(err))
return
}
}
}
}()
}
// WatchServices watch the service's up and down in etcd, and send event to
// eventChannel.
// prefix is a parameter to know which service to watch and can be obtained in
@ -312,58 +348,86 @@ type SessionEvent struct {
// in GetSessions.
// If a server up, a event will be add to channel with eventType SessionAddType.
// If a server down, a event will be add to channel with eventType SessionDelType.
func (s *Session) WatchServices(prefix string, revision int64) (eventChannel <-chan *SessionEvent) {
eventCh := make(chan *SessionEvent, 100)
rch := s.etcdCli.Watch(s.ctx, path.Join(s.metaRoot, DefaultServiceRoot, prefix), clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision))
go func() {
for {
select {
case <-s.ctx.Done():
return
case wresp, ok := <-rch:
if !ok {
return
}
if wresp.Err() != nil {
//close event channel
log.Warn("Watch service found error", zap.Error(wresp.Err()))
close(eventCh)
return
}
for _, ev := range wresp.Events {
session := &Session{}
var eventType SessionEventType
switch ev.Type {
case mvccpb.PUT:
log.Debug("watch services",
zap.Any("add kv", ev.Kv))
err := json.Unmarshal([]byte(ev.Kv.Value), session)
if err != nil {
log.Error("watch services", zap.Error(err))
continue
}
eventType = SessionAddEvent
case mvccpb.DELETE:
log.Debug("watch services",
zap.Any("delete kv", ev.PrevKv))
err := json.Unmarshal([]byte(ev.PrevKv.Value), session)
if err != nil {
log.Error("watch services", zap.Error(err))
continue
}
eventType = SessionDelEvent
}
log.Debug("WatchService", zap.Any("event type", eventType))
eventCh <- &SessionEvent{
EventType: eventType,
Session: session,
}
}
func (s *Session) WatchServices(prefix string, revision int64, rewatch Rewatch) (eventChannel <-chan *SessionEvent) {
w := &sessionWatcher{
s: s,
eventCh: make(chan *SessionEvent, 100),
rch: s.etcdCli.Watch(s.ctx, path.Join(s.metaRoot, DefaultServiceRoot, prefix), clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision)),
prefix: prefix,
rewatch: rewatch,
}
w.start()
return w.eventCh
}
func (w *sessionWatcher) handleWatchResponse(wresp clientv3.WatchResponse) error {
if wresp.Err() != nil {
return w.handleWatchErr(wresp.Err())
}
for _, ev := range wresp.Events {
session := &Session{}
var eventType SessionEventType
switch ev.Type {
case mvccpb.PUT:
log.Debug("watch services",
zap.Any("add kv", ev.Kv))
err := json.Unmarshal([]byte(ev.Kv.Value), session)
if err != nil {
log.Error("watch services", zap.Error(err))
continue
}
eventType = SessionAddEvent
case mvccpb.DELETE:
log.Debug("watch services",
zap.Any("delete kv", ev.PrevKv))
err := json.Unmarshal([]byte(ev.PrevKv.Value), session)
if err != nil {
log.Error("watch services", zap.Error(err))
continue
}
eventType = SessionDelEvent
}
}()
return eventCh
log.Debug("WatchService", zap.Any("event type", eventType))
w.eventCh <- &SessionEvent{
EventType: eventType,
Session: session,
}
}
return nil
}
func (w *sessionWatcher) handleWatchErr(err error) error {
// if not ErrCompacted, just close the channel
if err != v3rpc.ErrCompacted {
//close event channel
log.Warn("Watch service found error", zap.Error(err))
close(w.eventCh)
return err
}
// rewatch is nil, no logic to handle
if w.rewatch == nil {
log.Warn("Watch service with ErrCompacted but no rewatch logic provided")
close(w.eventCh)
return err
}
sessions, revision, err := w.s.GetSessions(w.prefix)
if err != nil {
log.Warn("GetSession before rewatch failed", zap.String("prefix", w.prefix), zap.Error(err))
close(w.eventCh)
return err
}
err = w.rewatch(sessions)
if err != nil {
log.Warn("WatchServices rewatch failed", zap.String("prefix", w.prefix), zap.Error(err))
close(w.eventCh)
return err
}
w.rch = w.s.etcdCli.Watch(w.s.ctx, path.Join(w.s.metaRoot, DefaultServiceRoot, w.prefix), clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision))
return nil
}
// LivenessCheck performs liveness check with provided context and channel

View File

@ -2,6 +2,7 @@ package sessionutil
import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
@ -13,6 +14,11 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/api/v3/mvccpb"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
)
var Params paramtable.BaseTable
@ -115,7 +121,7 @@ func TestUpdateSessions(t *testing.T) {
sessions, rev, err := s.GetSessions("test")
assert.Nil(t, err)
assert.Equal(t, len(sessions), 0)
eventCh := s.WatchServices("test", rev)
eventCh := s.WatchServices("test", rev, nil)
sList := []*Session{}
@ -203,6 +209,138 @@ func TestSessionLivenessCheck(t *testing.T) {
assert.False(t, flag)
}
func TestWatcherHandleWatchResp(t *testing.T) {
ctx := context.Background()
Params.Init()
endpoints, err := Params.Load("_EtcdEndpoints")
require.NoError(t, err)
etcdEndpoints := strings.Split(endpoints, ",")
metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
etcdKV, err := etcdkv.NewEtcdKV(etcdEndpoints, "/by-dev/session-ut")
require.NoError(t, err)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("/by-dev/session-ut")
s := NewSession(ctx, metaRoot, etcdEndpoints)
defer s.Revoke(time.Second)
getWatcher := func(s *Session, rewatch Rewatch) *sessionWatcher {
return &sessionWatcher{
s: s,
prefix: "test",
rewatch: rewatch,
eventCh: make(chan *SessionEvent, 10),
}
}
t.Run("handle normal events", func(t *testing.T) {
w := getWatcher(s, nil)
wresp := clientv3.WatchResponse{
Events: []*clientv3.Event{
{
Type: mvccpb.PUT,
Kv: &mvccpb.KeyValue{
Value: []byte(`{"ServerID": 1, "ServerName": "test1"}`),
},
},
{
Type: mvccpb.DELETE,
PrevKv: &mvccpb.KeyValue{
Value: []byte(`{"ServerID": 2, "ServerName": "test2"}`),
},
},
},
}
err := w.handleWatchResponse(wresp)
assert.NoError(t, err)
assert.Equal(t, 2, len(w.eventCh))
})
t.Run("handle abnormal events", func(t *testing.T) {
w := getWatcher(s, nil)
wresp := clientv3.WatchResponse{
Events: []*clientv3.Event{
{
Type: mvccpb.PUT,
Kv: &mvccpb.KeyValue{
Value: []byte(``),
},
},
{
Type: mvccpb.DELETE,
PrevKv: &mvccpb.KeyValue{
Value: []byte(``),
},
},
},
}
var err error
assert.NotPanics(t, func() {
err = w.handleWatchResponse(wresp)
})
assert.NoError(t, err)
assert.Equal(t, 0, len(w.eventCh))
})
t.Run("err compacted resp, nil Rewatch", func(t *testing.T) {
w := getWatcher(s, nil)
wresp := clientv3.WatchResponse{
CompactRevision: 1,
}
err := w.handleWatchResponse(wresp)
assert.Error(t, err)
assert.Equal(t, v3rpc.ErrCompacted, err)
})
t.Run("err compacted resp, valid Rewatch", func(t *testing.T) {
w := getWatcher(s, func(sessions map[string]*Session) error {
return nil
})
wresp := clientv3.WatchResponse{
CompactRevision: 1,
}
err := w.handleWatchResponse(wresp)
assert.NoError(t, err)
})
t.Run("err canceled", func(t *testing.T) {
w := getWatcher(s, nil)
wresp := clientv3.WatchResponse{
Canceled: true,
}
err := w.handleWatchResponse(wresp)
assert.Error(t, err)
})
t.Run("err handled but list failed", func(t *testing.T) {
s := NewSession(ctx, "/by-dev/session-ut", etcdEndpoints)
s.etcdCli.Close()
w := getWatcher(s, func(sessions map[string]*Session) error {
return nil
})
wresp := clientv3.WatchResponse{
CompactRevision: 1,
}
err = w.handleWatchResponse(wresp)
assert.Error(t, err)
})
t.Run("err handled but rewatch failed", func(t *testing.T) {
w := getWatcher(s, func(sessions map[string]*Session) error {
return errors.New("mocked")
})
wresp := clientv3.WatchResponse{
CompactRevision: 1,
}
err := w.handleWatchResponse(wresp)
assert.Error(t, err)
})
}
func TestSessionRevoke(t *testing.T) {
s := &Session{}
assert.NotPanics(t, func() {