mirror of https://github.com/milvus-io/milvus.git
Remove offLineNodes from watch states check (#16347)
This PR also - adds more information in log - makes Session able to logged by zap.Any/zap.String Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/16354/head
parent
7303036c19
commit
5494f3c318
|
@ -101,14 +101,16 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe
|
|||
select {
|
||||
case <-time.NewTimer(time.Until(timeoutT)).C:
|
||||
log.Info("timeout and stop timer: wait for channel ACK timeout",
|
||||
zap.String("state", watchState.String()),
|
||||
zap.String("watch state", watchState.String()),
|
||||
zap.Int64("nodeID", nodeID),
|
||||
zap.String("channel name", channelName),
|
||||
zap.Time("timeout time", timeoutT))
|
||||
ackType := getAckType(watchState)
|
||||
c.notifyTimeoutWatcher(&ackEvent{ackType, channelName, nodeID})
|
||||
case <-stop:
|
||||
log.Debug("stop timer before timeout",
|
||||
zap.String("state", watchState.String()),
|
||||
zap.String("watch state", watchState.String()),
|
||||
zap.Int64("nodeID", nodeID),
|
||||
zap.String("channel name", channelName),
|
||||
zap.Time("timeout time", timeoutT))
|
||||
}
|
||||
|
|
|
@ -121,7 +121,8 @@ func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error {
|
|||
}
|
||||
|
||||
// Process watch states for old nodes.
|
||||
if err := c.checkOldNodes(oNodes); err != nil {
|
||||
oldOnLines := c.getOldOnlines(nodes, oNodes)
|
||||
if err := c.checkOldNodes(oldOnLines); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -152,8 +153,9 @@ func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error {
|
|||
}
|
||||
|
||||
log.Info("cluster start up",
|
||||
zap.Any("nodes", nodes),
|
||||
zap.Any("oNodes", oNodes),
|
||||
zap.Int64s("nodes", nodes),
|
||||
zap.Int64s("oNodes", oNodes),
|
||||
zap.Int64s("old onlines", oldOnLines),
|
||||
zap.Int64s("new onlines", newOnLines),
|
||||
zap.Int64s("offLines", offLines))
|
||||
return nil
|
||||
|
@ -176,6 +178,10 @@ func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error {
|
|||
for _, info := range watchInfos {
|
||||
channelName := info.GetVchan().GetChannelName()
|
||||
|
||||
log.Debug("processing watch info",
|
||||
zap.String("watch state", info.GetState().String()),
|
||||
zap.String("channel name", channelName))
|
||||
|
||||
switch info.GetState() {
|
||||
case datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_Uncomplete:
|
||||
c.stateTimer.startOne(datapb.ChannelWatchState_ToWatch, channelName, nodeID, info.GetTimeoutTs())
|
||||
|
@ -257,6 +263,21 @@ func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
// getOldOnlines returns a list of old online node ids in `old` and in `curr`.
|
||||
func (c *ChannelManager) getOldOnlines(curr []int64, old []int64) []int64 {
|
||||
mcurr := make(map[int64]struct{})
|
||||
ret := make([]int64, 0, len(old))
|
||||
for _, n := range curr {
|
||||
mcurr[n] = struct{}{}
|
||||
}
|
||||
for _, n := range old {
|
||||
if _, found := mcurr[n]; found {
|
||||
ret = append(ret, n)
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
// getNewOnLines returns a list of new online node ids in `curr` but not in `old`.
|
||||
func (c *ChannelManager) getNewOnLines(curr []int64, old []int64) []int64 {
|
||||
mold := make(map[int64]struct{})
|
||||
|
@ -571,21 +592,21 @@ func (c *ChannelManager) processAck(e *ackEvent) {
|
|||
err := c.Release(e.nodeID, e.channelName)
|
||||
if err != nil {
|
||||
log.Warn("fail to set channels to release for watch failure ACKs",
|
||||
zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName))
|
||||
zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName), zap.Error(err))
|
||||
}
|
||||
|
||||
case releaseFailAck, releaseTimeoutAck: // failure acks from toRelease
|
||||
err := c.cleanUpAndDelete(e.nodeID, e.channelName)
|
||||
if err != nil {
|
||||
log.Warn("fail to clean and delete channels for release failure ACKs",
|
||||
zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName))
|
||||
zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName), zap.Error(err))
|
||||
}
|
||||
|
||||
case releaseSuccessAck:
|
||||
err := c.toDelete(e.nodeID, e.channelName)
|
||||
if err != nil {
|
||||
log.Warn("fail to response to release success ACK",
|
||||
zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName))
|
||||
zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName), zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -788,3 +788,31 @@ func TestChannelManager_RemoveChannel(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestChannelManager_HelperFunc(t *testing.T) {
|
||||
c := &ChannelManager{}
|
||||
t.Run("test getOldOnlines", func(t *testing.T) {
|
||||
tests := []struct {
|
||||
nodes []int64
|
||||
oNodes []int64
|
||||
|
||||
expectedOut []int64
|
||||
desription string
|
||||
}{
|
||||
{[]int64{}, []int64{}, []int64{}, "empty both"},
|
||||
{[]int64{1}, []int64{}, []int64{}, "empty oNodes"},
|
||||
{[]int64{}, []int64{1}, []int64{}, "empty nodes"},
|
||||
{[]int64{1}, []int64{1}, []int64{1}, "same one"},
|
||||
{[]int64{1, 2}, []int64{1}, []int64{1}, "same one 2"},
|
||||
{[]int64{1}, []int64{1, 2}, []int64{1}, "same one 3"},
|
||||
{[]int64{1, 2}, []int64{1, 2}, []int64{1, 2}, "same two"},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.desription, func(t *testing.T) {
|
||||
nodes := c.getOldOnlines(test.nodes, test.oNodes)
|
||||
assert.ElementsMatch(t, test.expectedOut, nodes)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -117,6 +117,11 @@ func (s *Session) Init(serverName, address string, exclusive bool, triggerKill b
|
|||
s.ServerID = serverID
|
||||
}
|
||||
|
||||
// String makes Session struct able to be logged by zap
|
||||
func (s *Session) String() string {
|
||||
return fmt.Sprintf("Session:<ServerID: %d, ServerName: %s>", s.ServerID, s.ServerName)
|
||||
}
|
||||
|
||||
// Register will process keepAliveResponse to keep alive with etcd.
|
||||
func (s *Session) Register() {
|
||||
ch, err := s.registerService()
|
||||
|
|
|
@ -12,10 +12,12 @@ import (
|
|||
"time"
|
||||
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/util/etcd"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
|
@ -399,3 +401,8 @@ func TestSession_Registered(t *testing.T) {
|
|||
session.UpdateRegistered(true)
|
||||
assert.True(t, session.Registered())
|
||||
}
|
||||
|
||||
func TestSession_String(t *testing.T) {
|
||||
s := &Session{}
|
||||
log.Debug("log session", zap.Any("session", s))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue