mirror of https://github.com/milvus-io/milvus.git
Handle etcd compacted error (#17886)
Signed-off-by: Cai.Zhang <cai.zhang@zilliz.com>pull/17967/head
parent
cdbd75d4dd
commit
01fc411566
|
@ -19,7 +19,9 @@ package distributed
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
||||
|
@ -232,7 +234,13 @@ func (cm *ConnectionManager) processEvent(channel <-chan *sessionutil.SessionEve
|
|||
}
|
||||
case ev, ok := <-channel:
|
||||
if !ok {
|
||||
//TODO silverxia add retry logic
|
||||
log.Error("watch service channel closed", zap.Int64("serverID", cm.session.ServerID))
|
||||
go cm.Stop()
|
||||
if cm.session.TriggerKill {
|
||||
if p, err := os.FindProcess(os.Getpid()); err == nil {
|
||||
p.Signal(syscall.SIGINT)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
switch ev.EventType {
|
||||
|
|
|
@ -19,7 +19,10 @@ package distributed
|
|||
import (
|
||||
"context"
|
||||
"net"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -175,33 +178,68 @@ func TestConnectionManager(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestConnectionManager_processEvent(t *testing.T) {
|
||||
cm := &ConnectionManager{
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
t.Run("close closeCh", func(t *testing.T) {
|
||||
cm := &ConnectionManager{
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
ech := make(chan *sessionutil.SessionEvent)
|
||||
flag := false
|
||||
signal := make(chan struct{}, 1)
|
||||
go func() {
|
||||
cm.processEvent(ech)
|
||||
flag = true
|
||||
signal <- struct{}{}
|
||||
}()
|
||||
ech := make(chan *sessionutil.SessionEvent)
|
||||
flag := false
|
||||
signal := make(chan struct{}, 1)
|
||||
go func() {
|
||||
assert.Panics(t, func() {
|
||||
cm.processEvent(ech)
|
||||
})
|
||||
|
||||
close(ech)
|
||||
<-signal
|
||||
assert.True(t, flag)
|
||||
flag = true
|
||||
signal <- struct{}{}
|
||||
}()
|
||||
|
||||
ech = make(chan *sessionutil.SessionEvent)
|
||||
flag = false
|
||||
go func() {
|
||||
cm.processEvent(ech)
|
||||
flag = true
|
||||
signal <- struct{}{}
|
||||
}()
|
||||
close(cm.closeCh)
|
||||
<-signal
|
||||
assert.True(t, flag)
|
||||
close(ech)
|
||||
<-signal
|
||||
assert.True(t, flag)
|
||||
|
||||
ech = make(chan *sessionutil.SessionEvent)
|
||||
flag = false
|
||||
go func() {
|
||||
cm.processEvent(ech)
|
||||
flag = true
|
||||
signal <- struct{}{}
|
||||
}()
|
||||
close(cm.closeCh)
|
||||
<-signal
|
||||
assert.True(t, flag)
|
||||
})
|
||||
|
||||
t.Run("close watch chan", func(t *testing.T) {
|
||||
sc := make(chan os.Signal, 1)
|
||||
signal.Notify(sc, syscall.SIGINT)
|
||||
defer signal.Reset(syscall.SIGINT)
|
||||
sigQuit := make(chan struct{}, 1)
|
||||
|
||||
cm := &ConnectionManager{
|
||||
closeCh: make(chan struct{}),
|
||||
session: &sessionutil.Session{
|
||||
ServerID: 1,
|
||||
TriggerKill: true,
|
||||
},
|
||||
}
|
||||
|
||||
ech := make(chan *sessionutil.SessionEvent)
|
||||
|
||||
go func() {
|
||||
<-sc
|
||||
sigQuit <- struct{}{}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
cm.processEvent(ech)
|
||||
}()
|
||||
|
||||
close(ech)
|
||||
|
||||
<-sigQuit
|
||||
})
|
||||
}
|
||||
|
||||
type testRootCoord struct {
|
||||
|
|
|
@ -876,10 +876,10 @@ func (i *IndexCoord) watchMetaLoop() {
|
|||
}
|
||||
if err := resp.Err(); err != nil {
|
||||
if err == v3rpc.ErrCompacted {
|
||||
newMetaTable, err := NewMetaTable(i.metaTable.client)
|
||||
if err != nil {
|
||||
newMetaTable, err2 := NewMetaTable(i.metaTable.client)
|
||||
if err2 != nil {
|
||||
log.Error("Constructing new meta table fails when etcd has a compaction error",
|
||||
zap.String("path", indexFilePrefix), zap.String("etcd error", err.Error()), zap.Error(err))
|
||||
zap.String("path", indexFilePrefix), zap.String("etcd error", err.Error()), zap.Error(err2))
|
||||
panic("failed to handle etcd request, exit..")
|
||||
}
|
||||
i.metaTable = newMetaTable
|
||||
|
|
|
@ -29,6 +29,8 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
|
@ -511,7 +513,32 @@ func (qc *QueryCoord) handoffNotificationLoop() {
|
|||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case resp := <-watchChan:
|
||||
case resp, ok := <-watchChan:
|
||||
if !ok {
|
||||
log.Warn("QueryCoord watch handoff segment loop failed because watch channel is closed")
|
||||
panic("QueryCoord watch handoff segment loop failed because watch channel is closed")
|
||||
}
|
||||
if err := resp.Err(); err != nil {
|
||||
// https://github.com/etcd-io/etcd/issues/8980
|
||||
if err == v3rpc.ErrCompacted {
|
||||
qc.handoffHandler, err = newHandoffHandler(qc.loopCtx, qc.kvClient, qc.meta, qc.cluster, qc.scheduler, qc.broker)
|
||||
if err != nil {
|
||||
log.Error("query coordinator re new handoff handler failed", zap.Error(err))
|
||||
panic("failed to handle etcd request, exit..")
|
||||
}
|
||||
if err2 := qc.handoffHandler.reloadFromKV(); err2 != nil {
|
||||
log.Error("reload index checker meta fails when etcd has a compaction error",
|
||||
zap.String("etcd error", err.Error()), zap.Error(err2))
|
||||
panic("failed to handle etcd request, exit..")
|
||||
}
|
||||
qc.loopWg.Add(1)
|
||||
go qc.handoffNotificationLoop()
|
||||
return
|
||||
}
|
||||
log.Error("received error event from etcd watcher", zap.String("prefix", util.HandoffSegmentPrefix),
|
||||
zap.Error(err))
|
||||
panic("failed to handle etcd request, exit..")
|
||||
}
|
||||
for _, event := range resp.Events {
|
||||
segmentInfo := &querypb.SegmentInfo{}
|
||||
err := proto.Unmarshal(event.Kv.Value, segmentInfo)
|
||||
|
|
|
@ -23,7 +23,9 @@ import (
|
|||
"math/rand"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path"
|
||||
"strconv"
|
||||
"sync"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -630,3 +632,110 @@ func TestLoadBalanceSegmentLoop(t *testing.T) {
|
|||
err = removeAllSession()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestQueryCoord_watchHandoffSegmentLoop(t *testing.T) {
|
||||
Params.Init()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
|
||||
assert.Nil(t, err)
|
||||
etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
|
||||
|
||||
qc := &QueryCoord{
|
||||
loopCtx: ctx,
|
||||
loopWg: sync.WaitGroup{},
|
||||
kvClient: etcdKV,
|
||||
handoffHandler: &HandoffHandler{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
client: etcdKV,
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("chan closed", func(t *testing.T) {
|
||||
qc.loopWg.Add(1)
|
||||
go func() {
|
||||
assert.Panics(t, func() {
|
||||
qc.handoffNotificationLoop()
|
||||
})
|
||||
}()
|
||||
|
||||
etcdCli.Close()
|
||||
qc.loopWg.Wait()
|
||||
})
|
||||
|
||||
t.Run("etcd compaction", func(t *testing.T) {
|
||||
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
|
||||
assert.Nil(t, err)
|
||||
etcdKV = etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
|
||||
qc.kvClient = etcdKV
|
||||
qc.handoffHandler.client = etcdKV
|
||||
qc.handoffHandler.revision = 0
|
||||
qc.meta = &MetaReplica{}
|
||||
qc.handoffHandler.meta = qc.meta
|
||||
qc.handoffHandler.tasks = make(map[int64]*HandOffTask)
|
||||
|
||||
for i := 1; i < 10; i++ {
|
||||
segInfo := &querypb.SegmentInfo{
|
||||
SegmentID: UniqueID(i),
|
||||
}
|
||||
v, err := proto.Marshal(segInfo)
|
||||
assert.Nil(t, err)
|
||||
key := path.Join(util.HandoffSegmentPrefix, strconv.Itoa(i))
|
||||
err = etcdKV.Save(key, string(v))
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
// The reason there the error is no handle is that if you run compact twice, an error will be reported;
|
||||
// error msg is "etcdserver: mvcc: required revision has been compacted"
|
||||
etcdCli.Compact(ctx, 10)
|
||||
qc.loopWg.Add(1)
|
||||
go qc.handoffNotificationLoop()
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
for i := 1; i < 10; i++ {
|
||||
k := path.Join(util.HandoffSegmentPrefix, strconv.Itoa(i))
|
||||
_, err = etcdCli.Delete(ctx, k)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
cancel()
|
||||
qc.loopWg.Wait()
|
||||
})
|
||||
|
||||
t.Run("etcd compaction and reload failed", func(t *testing.T) {
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
etcdCli, err = etcd.GetEtcdClient(&Params.EtcdCfg)
|
||||
assert.Nil(t, err)
|
||||
etcdKV = etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
|
||||
qc.loopCtx = ctx
|
||||
qc.loopCancel = cancel
|
||||
qc.kvClient = etcdKV
|
||||
qc.handoffHandler.client = etcdKV
|
||||
qc.handoffHandler.revision = 0
|
||||
qc.handoffHandler.tasks = make(map[int64]*HandOffTask)
|
||||
|
||||
for i := 1; i < 10; i++ {
|
||||
key := path.Join(util.HandoffSegmentPrefix, strconv.Itoa(i))
|
||||
v := "segment-" + strconv.Itoa(i)
|
||||
err = etcdKV.Save(key, v)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
// The reason there the error is no handle is that if you run compact twice, an error will be reported;
|
||||
// error msg is "etcdserver: mvcc: required revision has been compacted"
|
||||
etcdCli.Compact(ctx, 10)
|
||||
qc.loopWg.Add(1)
|
||||
go func() {
|
||||
assert.Panics(t, func() {
|
||||
qc.handoffNotificationLoop()
|
||||
})
|
||||
}()
|
||||
qc.loopWg.Wait()
|
||||
|
||||
for i := 1; i < 10; i++ {
|
||||
k := path.Join(util.HandoffSegmentPrefix, strconv.Itoa(i))
|
||||
_, err = etcdCli.Delete(ctx, k)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ import (
|
|||
"path"
|
||||
"sync"
|
||||
|
||||
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
|
@ -82,6 +84,7 @@ func (p *proxyManager) WatchProxy() error {
|
|||
return err
|
||||
}
|
||||
log.Debug("succeed to init sessions on etcd", zap.Any("sessions", sessions), zap.Int64("revision", rev))
|
||||
// all init function should be clear meta firstly.
|
||||
for _, f := range p.initSessionsFunc {
|
||||
f(sessions)
|
||||
}
|
||||
|
@ -105,13 +108,22 @@ func (p *proxyManager) startWatchEtcd(ctx context.Context, eventCh clientv3.Watc
|
|||
case <-ctx.Done():
|
||||
log.Warn("stop watching etcd loop")
|
||||
return
|
||||
// TODO @xiaocai2333: watch proxy by session WatchService.
|
||||
case event, ok := <-eventCh:
|
||||
if !ok {
|
||||
log.Warn("stop watching etcd loop due to closed etcd event channel")
|
||||
return
|
||||
panic("stop watching etcd loop due to closed etcd event channel")
|
||||
}
|
||||
if err := event.Err(); err != nil {
|
||||
// TODO do we need to retry watch etcd when ErrCompacted, but the init session func may not be idempotent so skip
|
||||
if err == v3rpc.ErrCompacted {
|
||||
err2 := p.WatchProxy()
|
||||
if err2 != nil {
|
||||
log.Error("re watch proxy fails when etcd has a compaction error",
|
||||
zap.String("etcd error", err.Error()), zap.Error(err2))
|
||||
panic("failed to handle etcd request, exit..")
|
||||
}
|
||||
return
|
||||
}
|
||||
log.Error("Watch proxy service failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
|
@ -182,7 +194,7 @@ func (p *proxyManager) getSessionsOnEtcd(ctx context.Context) ([]*sessionutil.Se
|
|||
session, err := p.parseSession(v.Value)
|
||||
if err != nil {
|
||||
log.Debug("failed to unmarshal session", zap.Error(err))
|
||||
continue
|
||||
return nil, 0, err
|
||||
}
|
||||
sessions = append(sessions, session)
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"path"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -98,3 +99,49 @@ func TestProxyManager(t *testing.T) {
|
|||
pm.Stop()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
func TestProxyManager_ErrCompacted(t *testing.T) {
|
||||
Params.Init()
|
||||
|
||||
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
|
||||
assert.Nil(t, err)
|
||||
defer etcdCli.Close()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
sessKey := path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot)
|
||||
f1 := func(sess []*sessionutil.Session) {
|
||||
t.Log("get sessions num", len(sess))
|
||||
}
|
||||
pm := newProxyManager(ctx, etcdCli, f1)
|
||||
|
||||
eventCh := pm.etcdCli.Watch(
|
||||
pm.ctx,
|
||||
path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyRole),
|
||||
clientv3.WithPrefix(),
|
||||
clientv3.WithCreatedNotify(),
|
||||
clientv3.WithPrevKV(),
|
||||
clientv3.WithRev(1),
|
||||
)
|
||||
|
||||
for i := 1; i < 10; i++ {
|
||||
k := path.Join(sessKey, typeutil.ProxyRole+strconv.FormatInt(int64(i), 10))
|
||||
v := "invalid session: " + strconv.FormatInt(int64(i), 10)
|
||||
_, err = etcdCli.Put(ctx, k, v)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
// The reason there the error is no handle is that if you run compact twice, an error will be reported;
|
||||
// error msg is "etcdserver: mvcc: required revision has been compacted"
|
||||
etcdCli.Compact(ctx, 10)
|
||||
|
||||
assert.Panics(t, func() {
|
||||
pm.startWatchEtcd(pm.ctx, eventCh)
|
||||
})
|
||||
|
||||
for i := 1; i < 10; i++ {
|
||||
k := path.Join(sessKey, typeutil.ProxyRole+strconv.FormatInt(int64(i), 10))
|
||||
_, err = etcdCli.Delete(ctx, k)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -276,6 +276,8 @@ func (t *timetickSync) delSession(sess *sessionutil.Session) {
|
|||
func (t *timetickSync) initSessions(sess []*sessionutil.Session) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
t.sess2ChanTsMap = make(map[typeutil.UniqueID]*chanTsMsg)
|
||||
t.sess2ChanTsMap[t.sourceID] = nil
|
||||
for _, s := range sess {
|
||||
t.sess2ChanTsMap[s.ServerID] = nil
|
||||
log.Debug("Init proxy sessions for timeticksync", zap.Int64("serverID", s.ServerID))
|
||||
|
|
Loading…
Reference in New Issue