mirror of https://github.com/milvus-io/milvus.git
backport load config timeout (#21784)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/21838/head
parent
709bf35b9a
commit
76ad254542
|
@ -34,6 +34,10 @@ const (
|
|||
ModeInterval
|
||||
)
|
||||
|
||||
const (
|
||||
ReadConfigTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
type EtcdSource struct {
|
||||
sync.RWMutex
|
||||
etcdCli *clientv3.Client
|
||||
|
@ -120,7 +124,9 @@ func (es *EtcdSource) Close() {
|
|||
|
||||
func (es *EtcdSource) refreshConfigurations() error {
|
||||
prefix := es.keyPrefix + "/config"
|
||||
response, err := es.etcdCli.Get(es.ctx, prefix, clientv3.WithPrefix())
|
||||
ctx, cancel := context.WithTimeout(es.ctx, ReadConfigTimeout)
|
||||
defer cancel()
|
||||
response, err := es.etcdCli.Get(ctx, prefix, clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ import (
|
|||
|
||||
const (
|
||||
// RequestTimeout is default timeout for etcd request.
|
||||
RequestTimeout = 10 * time.Second
|
||||
RequestTimeout = 30 * time.Second
|
||||
)
|
||||
|
||||
// EtcdKV implements TxnKV interface, it supports to process multiple kvs in a transaction.
|
||||
|
@ -432,7 +432,7 @@ func (kv *EtcdKV) MultiSave(kvs map[string]string) error {
|
|||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
CheckTnxStringValueSizeAndWarn(kvs)
|
||||
CheckTxnStringValueSizeAndWarn(kvs)
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi save", zap.Strings("keys", keys))
|
||||
return err
|
||||
|
@ -451,7 +451,7 @@ func (kv *EtcdKV) MultiSaveBytes(kvs map[string][]byte) error {
|
|||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
CheckTnxBytesValueSizeAndWarn(kvs)
|
||||
CheckTxnBytesValueSizeAndWarn(kvs)
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi save", zap.Strings("keys", keys))
|
||||
return err
|
||||
|
@ -542,6 +542,7 @@ func (kv *EtcdKV) MultiSaveBytesAndRemove(saves map[string][]byte, removals []st
|
|||
}
|
||||
|
||||
// Watch starts watching a key, returns a watch channel.
|
||||
// Watch related can not set timeout
|
||||
func (kv *EtcdKV) Watch(key string) clientv3.WatchChan {
|
||||
start := time.Now()
|
||||
key = path.Join(kv.rootPath, key)
|
||||
|
@ -631,7 +632,10 @@ func (kv *EtcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte, rem
|
|||
// Grant creates a new lease implemented in etcd grant interface.
|
||||
func (kv *EtcdKV) Grant(ttl int64) (id clientv3.LeaseID, err error) {
|
||||
start := time.Now()
|
||||
resp, err := kv.client.Grant(context.Background(), ttl)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
resp, err := kv.client.Grant(ctx, ttl)
|
||||
CheckElapseAndWarn(start, "Slow etcd operation grant")
|
||||
return resp.ID, err
|
||||
}
|
||||
|
@ -745,7 +749,7 @@ func CheckValueSizeAndWarn(key string, value interface{}) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func CheckTnxBytesValueSizeAndWarn(kvs map[string][]byte) bool {
|
||||
func CheckTxnBytesValueSizeAndWarn(kvs map[string][]byte) bool {
|
||||
var hasWarn bool
|
||||
for key, value := range kvs {
|
||||
if CheckValueSizeAndWarn(key, value) {
|
||||
|
@ -755,11 +759,11 @@ func CheckTnxBytesValueSizeAndWarn(kvs map[string][]byte) bool {
|
|||
return hasWarn
|
||||
}
|
||||
|
||||
func CheckTnxStringValueSizeAndWarn(kvs map[string]string) bool {
|
||||
func CheckTxnStringValueSizeAndWarn(kvs map[string]string) bool {
|
||||
newKvs := make(map[string][]byte, len(kvs))
|
||||
for key, value := range kvs {
|
||||
newKvs[key] = []byte(value)
|
||||
}
|
||||
|
||||
return CheckTnxBytesValueSizeAndWarn(newKvs)
|
||||
return CheckTxnBytesValueSizeAndWarn(newKvs)
|
||||
}
|
||||
|
|
|
@ -1020,21 +1020,21 @@ func TestCheckValueSizeAndWarn(t *testing.T) {
|
|||
func TestCheckTnxBytesValueSizeAndWarn(t *testing.T) {
|
||||
kvs := make(map[string][]byte, 0)
|
||||
kvs["k"] = []byte("v")
|
||||
ret := etcdkv.CheckTnxBytesValueSizeAndWarn(kvs)
|
||||
ret := etcdkv.CheckTxnBytesValueSizeAndWarn(kvs)
|
||||
assert.False(t, ret)
|
||||
|
||||
kvs["k"] = make([]byte, 1024000)
|
||||
ret = etcdkv.CheckTnxBytesValueSizeAndWarn(kvs)
|
||||
ret = etcdkv.CheckTxnBytesValueSizeAndWarn(kvs)
|
||||
assert.True(t, ret)
|
||||
}
|
||||
|
||||
func TestCheckTnxStringValueSizeAndWarn(t *testing.T) {
|
||||
kvs := make(map[string]string, 0)
|
||||
kvs["k"] = "v"
|
||||
ret := etcdkv.CheckTnxStringValueSizeAndWarn(kvs)
|
||||
ret := etcdkv.CheckTxnStringValueSizeAndWarn(kvs)
|
||||
assert.False(t, ret)
|
||||
|
||||
kvs["k1"] = funcutil.RandomString(1024000)
|
||||
ret = etcdkv.CheckTnxStringValueSizeAndWarn(kvs)
|
||||
ret = etcdkv.CheckTxnStringValueSizeAndWarn(kvs)
|
||||
assert.True(t, ret)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue