Fix etcd watch error (#17364)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/17385/head
Xiaofan 2022-06-06 16:26:06 +08:00 committed by GitHub
parent 45febac298
commit af994b5e1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 94 additions and 51 deletions

View File

@ -682,6 +682,7 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
// REF MEP#7 watchInfo paths are orgnized as: [prefix]/channel/{node_id}/{channel_name}
watchPrefix := Params.DataCoordCfg.ChannelWatchSubPath
// TODO, this is risky, we'd better watch etcd with revision rather simply a path
etcdWatcher, timeoutWatcher := c.stateTimer.getWatchers(watchPrefix)
for {
@ -693,16 +694,22 @@ func (c *ChannelManager) watchChannelStatesLoop(ctx context.Context) {
log.Debug("receive timeout acks from state watcher",
zap.Int64("nodeID", ackEvent.nodeID), zap.String("channel name", ackEvent.channelName))
c.processAck(ackEvent)
case event := <-etcdWatcher:
if event.Canceled {
log.Warn("watch channel canceled", zap.Error(event.Err()))
case event, ok := <-etcdWatcher:
if !ok {
log.Warn("datacoord failed to watch channel, return")
return
}
if err := event.Err(); err != nil {
log.Warn("datacoord watch channel hit error", zap.Error(event.Err()))
// https://github.com/etcd-io/etcd/issues/8980
if event.Err() == v3rpc.ErrCompacted {
go c.watchChannelStatesLoop(ctx)
return
}
// if watch loop return due to event canceled, the datacoord is not functional anymore
log.Panic("datacoord is not functional for event canceled")
log.Panic("datacoord is not functional for event canceled", zap.Error(err))
return
}
for _, evt := range event.Events {

View File

@ -227,6 +227,7 @@ func (node *DataNode) Init() error {
func (node *DataNode) StartWatchChannels(ctx context.Context) {
defer logutil.LogPanic()
// REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name}
// TODO, this is risky, we'd better watch etcd with revision rather simply a path
watchPrefix := path.Join(Params.DataNodeCfg.ChannelWatchSubPath, fmt.Sprintf("%d", Params.DataNodeCfg.GetNodeID()))
evtChan := node.watchKv.WatchWithPrefix(watchPrefix)
// after watch, first check all exists nodes first
@ -240,20 +241,21 @@ func (node *DataNode) StartWatchChannels(ctx context.Context) {
case <-ctx.Done():
log.Info("watch etcd loop quit")
return
case event := <-evtChan:
if event.Canceled { // event canceled
log.Warn("watch channel canceled", zap.Error(event.Err()))
case event, ok := <-evtChan:
if !ok {
log.Warn("datanode failed to watch channel, return")
return
}
if err := event.Err(); err != nil {
log.Warn("datanode watch channel canceled", zap.Error(event.Err()))
// https://github.com/etcd-io/etcd/issues/8980
if event.Err() == v3rpc.ErrCompacted {
go node.StartWatchChannels(ctx)
return
}
// if watch loop return due to event canceled, the datanode is not functional anymore
// stop the datanode and wait for restart
err := node.Stop()
if err != nil {
log.Warn("node stop failed", zap.Error(err))
}
log.Panic("datanode is not functional for event canceled", zap.Error(err))
return
}
for _, evt := range event.Events {

View File

@ -21,6 +21,7 @@ import (
"errors"
"math/rand"
"os"
"path"
"sort"
"strconv"
"sync"
@ -818,20 +819,27 @@ func (i *IndexCoord) watchMetaLoop() {
defer i.loopWg.Done()
log.Debug("IndexCoord watchMetaLoop start")
watchChan := i.metaTable.client.WatchWithPrefix("indexes")
watchChan := i.metaTable.client.WatchWithPrefix(indexFilePrefix)
for {
select {
case <-ctx.Done():
return
case resp := <-watchChan:
log.Debug("IndexCoord watchMetaLoop find meta updated.")
case resp, ok := <-watchChan:
if !ok {
log.Warn("index coord watch meta loop failed because watch channel closed")
return
}
if err := resp.Err(); err != nil {
log.Error("received error event from etcd watcher", zap.String("path", indexFilePrefix), zap.Error(err))
panic("failed to handle etcd request, exit..")
}
for _, event := range resp.Events {
eventRevision := event.Kv.Version
indexMeta := &indexpb.IndexMeta{}
err := proto.Unmarshal(event.Kv.Value, indexMeta)
indexBuildID := indexMeta.IndexBuildID
log.Debug("IndexCoord watchMetaLoop", zap.Int64("IndexBuildID", indexBuildID))
log.Info("IndexCoord watchMetaLoop", zap.Int64("IndexBuildID", indexBuildID))
if err != nil {
log.Warn("IndexCoord unmarshal indexMeta failed", zap.Int64("IndexBuildID", indexBuildID),
zap.Error(err))
@ -982,7 +990,7 @@ func (i *IndexCoord) assignTaskLoop() {
IndexName: meta.indexMeta.Req.IndexName,
IndexID: meta.indexMeta.Req.IndexID,
Version: meta.indexMeta.Version + 1,
MetaPath: "/indexes/" + strconv.FormatInt(indexBuildID, 10),
MetaPath: path.Join(indexFilePrefix, strconv.FormatInt(indexBuildID, 10)),
DataPaths: meta.indexMeta.Req.DataPaths,
TypeParams: meta.indexMeta.Req.TypeParams,
IndexParams: meta.indexMeta.Req.IndexParams,

View File

@ -69,7 +69,7 @@ func NewMetaTable(kv *etcdkv.EtcdKV) (*metaTable, error) {
// reloadFromKV reloads the index meta from ETCD.
func (mt *metaTable) reloadFromKV() error {
mt.indexBuildID2Meta = make(map[UniqueID]Meta)
key := "indexes"
key := indexFilePrefix
log.Debug("IndexCoord metaTable LoadWithPrefix ", zap.String("prefix", key))
_, values, versions, err := mt.client.LoadWithPrefix2(key)
@ -115,8 +115,7 @@ func (mt *metaTable) saveIndexMeta(meta *Meta) error {
// reloadMeta reloads the index meta corresponding indexBuildID from ETCD.
func (mt *metaTable) reloadMeta(indexBuildID UniqueID) (*Meta, error) {
key := "indexes/" + strconv.FormatInt(indexBuildID, 10)
key := path.Join(indexFilePrefix, strconv.FormatInt(indexBuildID, 10))
_, values, version, err := mt.client.LoadWithPrefix2(key)
log.Debug("IndexCoord reloadMeta mt.client.LoadWithPrefix2", zap.Any("indexBuildID", indexBuildID), zap.Error(err))
if err != nil {
@ -340,7 +339,7 @@ func (mt *metaTable) DeleteIndex(indexBuildID UniqueID) {
defer mt.lock.Unlock()
delete(mt.indexBuildID2Meta, indexBuildID)
key := "indexes/" + strconv.FormatInt(indexBuildID, 10)
key := path.Join(indexFilePrefix, strconv.FormatInt(indexBuildID, 10))
if err := mt.client.Remove(key); err != nil {
log.Error("IndexCoord delete index meta from etcd failed", zap.Error(err))

View File

@ -61,6 +61,7 @@ import (
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
)
// make sure QueryNode implements types.QueryNode
@ -325,7 +326,23 @@ func (node *QueryNode) watchChangeInfo() {
case <-node.queryNodeLoopCtx.Done():
log.Info("query node watchChangeInfo close")
return
case resp := <-watchChan:
case resp, ok := <-watchChan:
if !ok {
log.Warn("querynode failed to watch channel, return")
return
}
if err := resp.Err(); err != nil {
log.Warn("query watch channel canceled", zap.Error(resp.Err()))
// https://github.com/etcd-io/etcd/issues/8980
if resp.Err() == v3rpc.ErrCompacted {
go node.watchChangeInfo()
return
}
// if watch loop return due to event canceled, the datanode is not functional anymore
log.Panic("querynoe3 is not functional for event canceled", zap.Error(err))
return
}
for _, event := range resp.Events {
switch event.Type {
case mvccpb.PUT:

View File

@ -139,7 +139,10 @@ func (nd *etcdShardNodeDetector) watch(ch clientv3.WatchChan, collectionID, repl
go nd.cancelClose(cancel)
nd.wg.Add(1)
go nd.watch(watchCh, collectionID, replicaID)
return
}
log.Error("failed to handle watch node error", zap.Error(err))
panic(err)
}
for _, e := range evt.Events {
switch e.Type {

View File

@ -129,6 +129,8 @@ func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID in
go sd.watch(watchCh, collectionID, replicaID, vchannel)
return
}
log.Error("failed to handle watch segment error, panic", zap.Error(err))
panic(err)
}
for _, e := range evt.Events {
switch e.Type {

View File

@ -111,8 +111,9 @@ func (p *proxyManager) startWatchEtcd(ctx context.Context, eventCh clientv3.Watc
return
}
if err := event.Err(); err != nil {
log.Error("received error event from etcd watcher", zap.Error(err))
return
// TODO do we need to retry watch etcd when ErrCompacted, but the init session func may not be idempotent so skip
log.Error("Watch proxy service failed", zap.Error(err))
panic(err)
}
for _, e := range event.Events {
var err error

View File

@ -329,15 +329,10 @@ func (w *sessionWatcher) start() {
return
case wresp, ok := <-w.rch:
if !ok {
log.Warn("session watch channel closed")
return
}
err := w.handleWatchResponse(wresp)
// internal error not handled,goroutine quit
if err != nil {
log.Warn("watch goroutine found error", zap.Error(err))
return
}
w.handleWatchResponse(wresp)
}
}
}()
@ -363,9 +358,14 @@ func (s *Session) WatchServices(prefix string, revision int64, rewatch Rewatch)
return w.eventCh
}
func (w *sessionWatcher) handleWatchResponse(wresp clientv3.WatchResponse) error {
func (w *sessionWatcher) handleWatchResponse(wresp clientv3.WatchResponse) {
if wresp.Err() != nil {
return w.handleWatchErr(wresp.Err())
err := w.handleWatchErr(wresp.Err())
if err != nil {
log.Error("failed to handle watch session response", zap.Error(err))
panic(err)
}
return
}
for _, ev := range wresp.Events {
session := &Session{}
@ -396,7 +396,6 @@ func (w *sessionWatcher) handleWatchResponse(wresp clientv3.WatchResponse) error
Session: session,
}
}
return nil
}
func (w *sessionWatcher) handleWatchErr(err error) error {

View File

@ -267,8 +267,10 @@ func TestWatcherHandleWatchResp(t *testing.T) {
},
},
}
err := w.handleWatchResponse(wresp)
assert.NoError(t, err)
assert.NotPanics(t, func() {
w.handleWatchResponse(wresp)
})
assert.Equal(t, 2, len(w.eventCh))
})
@ -290,11 +292,9 @@ func TestWatcherHandleWatchResp(t *testing.T) {
},
},
}
var err error
assert.NotPanics(t, func() {
err = w.handleWatchResponse(wresp)
w.handleWatchResponse(wresp)
})
assert.NoError(t, err)
assert.Equal(t, 0, len(w.eventCh))
})
@ -303,8 +303,9 @@ func TestWatcherHandleWatchResp(t *testing.T) {
wresp := clientv3.WatchResponse{
CompactRevision: 1,
}
err := w.handleWatchResponse(wresp)
assert.NoError(t, err)
assert.NotPanics(t, func() {
w.handleWatchResponse(wresp)
})
})
t.Run("err compacted resp, valid Rewatch", func(t *testing.T) {
@ -314,8 +315,9 @@ func TestWatcherHandleWatchResp(t *testing.T) {
wresp := clientv3.WatchResponse{
CompactRevision: 1,
}
err := w.handleWatchResponse(wresp)
assert.NoError(t, err)
assert.NotPanics(t, func() {
w.handleWatchResponse(wresp)
})
})
t.Run("err canceled", func(t *testing.T) {
@ -323,8 +325,10 @@ func TestWatcherHandleWatchResp(t *testing.T) {
wresp := clientv3.WatchResponse{
Canceled: true,
}
err := w.handleWatchResponse(wresp)
assert.Error(t, err)
assert.Panics(t, func() {
w.handleWatchResponse(wresp)
})
})
t.Run("err handled but rewatch failed", func(t *testing.T) {
@ -334,10 +338,9 @@ func TestWatcherHandleWatchResp(t *testing.T) {
wresp := clientv3.WatchResponse{
CompactRevision: 1,
}
err := w.handleWatchResponse(wresp)
t.Log(err.Error())
assert.Error(t, err)
assert.Panics(t, func() {
w.handleWatchResponse(wresp)
})
})
t.Run("err handled but list failed", func(t *testing.T) {
@ -350,8 +353,10 @@ func TestWatcherHandleWatchResp(t *testing.T) {
CompactRevision: 1,
}
err = w.handleWatchResponse(wresp)
assert.Error(t, err)
assert.Panics(t, func() {
w.handleWatchResponse(wresp)
})
})
}