Close event channel when watch event found error (#9819)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/9911/head
congqixia 2021-10-14 19:20:35 +08:00 committed by GitHub
parent 94e4dfd226
commit 6b8ff60de3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 131 additions and 6 deletions

View File

@ -432,6 +432,10 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
}
}
//go:norace
// fix datarace in unittest
// startWatchService will only be invoked at start procedure
// otherwise, remove the annotation and add atomic protection
func (s *Server) startWatchService(ctx context.Context) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
@ -440,7 +444,11 @@ func (s *Server) startWatchService(ctx context.Context) {
case <-ctx.Done():
log.Debug("watch service shutdown")
return
case event := <-s.eventCh:
case event, ok := <-s.eventCh:
if !ok {
//TODO add retry logic
return
}
if err := s.handleSessionEvent(ctx, event); err != nil {
go func() {
if err := s.Stop(); err != nil {

View File

@ -572,6 +572,42 @@ func TestGetFlushedSegments(t *testing.T) {
})
}
func TestService_WatchServices(t *testing.T) {
svr := newTestServer(t, nil)
ech := make(chan *sessionutil.SessionEvent)
svr.eventCh = ech
flag := false
signal := make(chan struct{}, 1)
go func() {
svr.startWatchService(context.Background())
flag = true
signal <- struct{}{}
}()
close(ech)
<-signal
assert.True(t, flag)
ech = make(chan *sessionutil.SessionEvent)
flag = false
svr.eventCh = ech
ctx, cancel := context.WithCancel(context.Background())
go func() {
svr.startWatchService(ctx)
flag = true
signal <- struct{}{}
}()
ech <- nil
cancel()
<-signal
assert.True(t, flag)
}
func TestServer_GetMetrics(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

View File

@ -200,6 +200,10 @@ func (cm *ConnectionManager) Stop() {
}
}
//go:norace
// fix datarace in unittest
// startWatchService will only be invoked at start procedure
// otherwise, remove the annotation and add atomic protection
func (cm *ConnectionManager) processEvent(channel <-chan *sessionutil.SessionEvent) {
for {
select {
@ -207,11 +211,16 @@ func (cm *ConnectionManager) processEvent(channel <-chan *sessionutil.SessionEve
if !ok {
return
}
case ev := <-channel:
if ev.EventType == sessionutil.SessionAddEvent {
case ev, ok := <-channel:
if !ok {
//TODO silverxia add retry logic
return
}
switch ev.EventType {
case sessionutil.SessionAddEvent:
log.Debug("ConnectionManager", zap.Any("add event", ev.Session))
cm.buildConnections(ev.Session)
} else if ev.EventType == sessionutil.SessionDelEvent {
case sessionutil.SessionDelEvent:
cm.removeTask(ev.Session.ServerID)
cm.removeConnection(ev.Session.ServerID)
}

View File

@ -150,6 +150,36 @@ func TestConnectionManager(t *testing.T) {
})
}
func TestConnectionManager_processEvent(t *testing.T) {
cm := &ConnectionManager{
closeCh: make(chan struct{}),
}
ech := make(chan *sessionutil.SessionEvent)
flag := false
signal := make(chan struct{}, 1)
go func() {
cm.processEvent(ech)
flag = true
signal <- struct{}{}
}()
close(ech)
<-signal
assert.True(t, flag)
ech = make(chan *sessionutil.SessionEvent)
flag = false
go func() {
cm.processEvent(ech)
flag = true
signal <- struct{}{}
}()
close(cm.closeCh)
<-signal
assert.True(t, flag)
}
type testRootCoord struct {
rootcoordpb.RootCoordServer
}

View File

@ -652,6 +652,10 @@ func (i *IndexCoord) recycleUnusedIndexFiles() {
}
// watchNodeLoop is used to monitor IndexNode going online and offline.
//go:norace
// fix datarace in unittest
// startWatchService will only be invoked at start procedure
// otherwise, remove the annotation and add atomic protection
func (i *IndexCoord) watchNodeLoop() {
ctx, cancel := context.WithCancel(i.loopCtx)
@ -663,7 +667,11 @@ func (i *IndexCoord) watchNodeLoop() {
select {
case <-ctx.Done():
return
case event := <-i.eventChan:
case event, ok := <-i.eventChan:
if !ok {
//TODO silverxia add retry
return
}
log.Debug("IndexCoord watchNodeLoop event updated")
switch event.EventType {
case sessionutil.SessionAddEvent:

View File

@ -14,6 +14,7 @@ package indexcoord
import (
"context"
"math/rand"
"sync"
"testing"
"time"
@ -28,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/stretchr/testify/assert"
@ -201,3 +203,26 @@ func TestIndexCoord(t *testing.T) {
err = ic.Stop()
assert.Nil(t, err)
}
func TestIndexCoord_watchNodeLoop(t *testing.T) {
ech := make(chan *sessionutil.SessionEvent)
in := &IndexCoord{
loopWg: sync.WaitGroup{},
loopCtx: context.Background(),
eventChan: ech,
}
in.loopWg.Add(1)
flag := false
signal := make(chan struct{}, 1)
go func() {
in.watchNodeLoop()
flag = true
signal <- struct{}{}
}()
close(ech)
<-signal
assert.True(t, flag)
}

View File

@ -256,7 +256,10 @@ func (qc *QueryCoord) watchNodeLoop() {
select {
case <-ctx.Done():
return
case event := <-qc.eventChan:
case event, ok := <-qc.eventChan:
if !ok {
return
}
switch event.EventType {
case sessionutil.SessionAddEvent:
serverID := event.Session.ServerID

View File

@ -309,6 +309,12 @@ func (s *Session) WatchServices(prefix string, revision int64) (eventChannel <-c
if !ok {
return
}
if wresp.Err() != nil {
//close event channel
log.Warn("Watch service found error", zap.Error(wresp.Err()))
close(eventCh)
return
}
for _, ev := range wresp.Events {
session := &Session{}
var eventType SessionEventType