mirror of https://github.com/milvus-io/milvus.git
parent
42be531f9c
commit
2e37fca214
|
@ -60,6 +60,7 @@ func NewSession(ctx context.Context, metaRoot string, etcdEndpoints []string) *S
|
|||
}
|
||||
|
||||
connectEtcdFn := func() error {
|
||||
log.Debug("Session try to connect to etcd")
|
||||
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: etcdEndpoints, DialTimeout: 5 * time.Second})
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -71,6 +72,7 @@ func NewSession(ctx context.Context, metaRoot string, etcdEndpoints []string) *S
|
|||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
log.Debug("Sessiont connect to etcd success")
|
||||
return session
|
||||
}
|
||||
|
||||
|
@ -109,40 +111,40 @@ func (s *Session) checkIDExist() {
|
|||
}
|
||||
|
||||
func (s *Session) getServerIDWithKey(key string, retryTimes uint) (int64, error) {
|
||||
res := int64(0)
|
||||
getServerIDWithKeyFn := func() error {
|
||||
for {
|
||||
log.Debug("Session try to get servdeID")
|
||||
getResp, err := s.etcdCli.Get(s.ctx, path.Join(s.metaRoot, DefaultServiceRoot, key))
|
||||
if err != nil {
|
||||
return nil
|
||||
log.Debug("Session get etcd key error", zap.String("key", key), zap.Error(err))
|
||||
return -1, err
|
||||
}
|
||||
if getResp.Count <= 0 {
|
||||
return fmt.Errorf("there is no value on key = %s", key)
|
||||
log.Debug("Session there is no value", zap.String("key", key))
|
||||
continue
|
||||
}
|
||||
value := string(getResp.Kvs[0].Value)
|
||||
valueInt, err := strconv.ParseInt(value, 10, 64)
|
||||
if err != nil {
|
||||
log.Debug("session", zap.Error(err))
|
||||
return err
|
||||
log.Debug("Session ParseInt error", zap.String("value", value), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
txnResp, err := s.etcdCli.Txn(s.ctx).If(
|
||||
clientv3.Compare(
|
||||
clientv3.Value(path.Join(s.metaRoot, DefaultServiceRoot, DefaultIDKey)),
|
||||
clientv3.Value(path.Join(s.metaRoot, DefaultServiceRoot, key)),
|
||||
"=",
|
||||
value)).
|
||||
Then(clientv3.OpPut(path.Join(s.metaRoot, DefaultServiceRoot, DefaultIDKey), strconv.FormatInt(valueInt+1, 10))).Commit()
|
||||
Then(clientv3.OpPut(path.Join(s.metaRoot, DefaultServiceRoot, key), strconv.FormatInt(valueInt+1, 10))).Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
log.Debug("Session Txn failed", zap.String("key", key), zap.Error(err))
|
||||
return -1, err
|
||||
}
|
||||
|
||||
if !txnResp.Succeeded {
|
||||
return fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key)
|
||||
log.Debug("Session Txn unsuccess", zap.String("key", key))
|
||||
continue
|
||||
}
|
||||
res = valueInt
|
||||
return nil
|
||||
return valueInt, nil
|
||||
}
|
||||
|
||||
err := retry.Do(s.ctx, getServerIDWithKeyFn, retry.Attempts(retryTimes), retry.Sleep(500*time.Millisecond))
|
||||
return res, err
|
||||
}
|
||||
|
||||
// registerService registers the service to etcd so that other services
|
||||
|
|
Loading…
Reference in New Issue