mirror of https://github.com/milvus-io/milvus.git
Fix coordinator fast restart (#28205)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/28218/head
parent
e8950a529e
commit
bbcaf7a703
|
@ -512,7 +512,7 @@ common:
|
|||
# superUsers: root
|
||||
tlsMode: 0
|
||||
session:
|
||||
ttl: 60 # ttl value when session granting a lease to register service
|
||||
ttl: 30 # ttl value when session granting a lease to register service
|
||||
retryTimes: 30 # retry times when session sending etcd requests
|
||||
|
||||
# preCreatedTopic decides whether using existed topic
|
||||
|
|
|
@ -432,11 +432,12 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er
|
|||
0)).
|
||||
Then(clientv3.OpPut(completeKey, string(sessionJSON), clientv3.WithLease(resp.ID))).Commit()
|
||||
if err != nil {
|
||||
log.Warn("compare and swap error, maybe the key has already been registered", zap.Error(err))
|
||||
log.Warn("register on etcd error, check the availability of etcd ", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if !txnResp.Succeeded {
|
||||
if txnResp != nil && !txnResp.Succeeded {
|
||||
s.handleRestart(completeKey)
|
||||
return fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", s.ServerName)
|
||||
}
|
||||
log.Info("put session key into etcd", zap.String("key", completeKey), zap.String("value", string(sessionJSON)))
|
||||
|
@ -459,6 +460,35 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er
|
|||
return ch, nil
|
||||
}
|
||||
|
||||
// Handle restart is fast path to handle node restart.
|
||||
// This should be only a fast path for coordinator
|
||||
// If we find previous session have same address as current , simply purge the old one so the recovery can be much faster
|
||||
func (s *Session) handleRestart(key string) {
|
||||
resp, err := s.etcdCli.Get(s.ctx, key)
|
||||
if err != nil {
|
||||
log.Warn("failed to read old session from etcd, ignore", zap.Any("key", key), zap.Error(err))
|
||||
return
|
||||
}
|
||||
for _, kv := range resp.Kvs {
|
||||
session := &Session{}
|
||||
err = json.Unmarshal(kv.Value, session)
|
||||
if err != nil {
|
||||
log.Warn("failed to unmarshal old session from etcd, ignore", zap.Any("key", key), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if session.Address == s.Address && session.ServerID < s.ServerID {
|
||||
log.Warn("find old session is same as current node, assume it as restart, purge old session", zap.String("key", key),
|
||||
zap.String("address", session.Address))
|
||||
_, err := s.etcdCli.Delete(s.ctx, key)
|
||||
if err != nil {
|
||||
log.Warn("failed to unmarshal old session from etcd, ignore", zap.Any("key", key), zap.Error(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processKeepAliveResponse processes the response of etcd keepAlive interface
|
||||
// If keepAlive fails for unexpected error, it will send a signal to the channel.
|
||||
func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveResponse) {
|
||||
|
|
|
@ -37,7 +37,7 @@ const (
|
|||
DefaultMiddlePriorityThreadCoreCoefficient = 5
|
||||
DefaultLowPriorityThreadCoreCoefficient = 1
|
||||
|
||||
DefaultSessionTTL = 60 // s
|
||||
DefaultSessionTTL = 30 // s
|
||||
DefaultSessionRetryTimes = 30
|
||||
|
||||
DefaultMaxDegree = 56
|
||||
|
|
Loading…
Reference in New Issue