mirror of https://github.com/milvus-io/milvus.git
enhance: Make etcd kv request timeout configurable (#28661)
See also #28660 This pr add request timeout config item for etcd kv request timeout Sync the default timeout value to same value for etcdKV & tikv config Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/28702/head
parent
e9e9b30e8e
commit
a2fe9dad49
|
@ -553,16 +553,19 @@ func (s *Server) initMeta(chunkManager storage.ChunkManager) error {
|
|||
if s.meta != nil {
|
||||
return nil
|
||||
}
|
||||
s.watchClient = etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
|
||||
s.watchClient = etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(),
|
||||
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
|
||||
metaType := Params.MetaStoreCfg.MetaStoreType.GetValue()
|
||||
log.Info("data coordinator connecting to metadata store", zap.String("metaType", metaType))
|
||||
metaRootPath := ""
|
||||
if metaType == util.MetaStoreTypeTiKV {
|
||||
metaRootPath = Params.TiKVCfg.MetaRootPath.GetValue()
|
||||
s.kv = tikv.NewTiKV(s.tikvCli, metaRootPath)
|
||||
s.kv = tikv.NewTiKV(s.tikvCli, metaRootPath,
|
||||
tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
|
||||
} else if metaType == util.MetaStoreTypeEtcd {
|
||||
metaRootPath = Params.EtcdCfg.MetaRootPath.GetValue()
|
||||
s.kv = etcdkv.NewEtcdKV(s.etcdCli, metaRootPath)
|
||||
s.kv = etcdkv.NewEtcdKV(s.etcdCli, metaRootPath,
|
||||
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
|
||||
} else {
|
||||
return retry.Unrecoverable(fmt.Errorf("not supported meta store: %s", metaType))
|
||||
}
|
||||
|
|
|
@ -354,7 +354,8 @@ func (node *DataNode) Start() error {
|
|||
}*/
|
||||
|
||||
connectEtcdFn := func() error {
|
||||
etcdKV := etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
|
||||
etcdKV := etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(),
|
||||
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
|
||||
node.watchKv = etcdKV
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -44,10 +44,12 @@ type EmbedEtcdKV struct {
|
|||
rootPath string
|
||||
etcd *embed.Etcd
|
||||
closeOnce sync.Once
|
||||
|
||||
requestTimeout time.Duration
|
||||
}
|
||||
|
||||
// NewEmbededEtcdKV creates a new etcd kv.
|
||||
func NewEmbededEtcdKV(cfg *embed.Config, rootPath string) (*EmbedEtcdKV, error) {
|
||||
func NewEmbededEtcdKV(cfg *embed.Config, rootPath string, options ...Option) (*EmbedEtcdKV, error) {
|
||||
e, err := embed.StartEtcd(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -55,10 +57,17 @@ func NewEmbededEtcdKV(cfg *embed.Config, rootPath string) (*EmbedEtcdKV, error)
|
|||
|
||||
client := v3client.New(e.Server)
|
||||
|
||||
opt := defaultOption()
|
||||
for _, option := range options {
|
||||
option(opt)
|
||||
}
|
||||
|
||||
kv := &EmbedEtcdKV{
|
||||
client: client,
|
||||
rootPath: rootPath,
|
||||
etcd: e,
|
||||
|
||||
requestTimeout: opt.requestTimeout,
|
||||
}
|
||||
|
||||
// wait until embed etcd is ready
|
||||
|
@ -87,7 +96,7 @@ func (kv *EmbedEtcdKV) GetPath(key string) string {
|
|||
|
||||
func (kv *EmbedEtcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error {
|
||||
prefix = path.Join(kv.rootPath, prefix)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
batch := int64(paginationSize)
|
||||
|
@ -123,7 +132,7 @@ func (kv *EmbedEtcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func
|
|||
func (kv *EmbedEtcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
log.Debug("LoadWithPrefix ", zap.String("prefix", key))
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
|
||||
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
|
||||
|
@ -143,7 +152,7 @@ func (kv *EmbedEtcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
|
|||
func (kv *EmbedEtcdKV) Has(key string) (bool, error) {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
log.Debug("Has", zap.String("key", key))
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.client.Get(ctx, key, clientv3.WithCountOnly())
|
||||
if err != nil {
|
||||
|
@ -156,7 +165,7 @@ func (kv *EmbedEtcdKV) HasPrefix(prefix string) (bool, error) {
|
|||
prefix = path.Join(kv.rootPath, prefix)
|
||||
log.Debug("HasPrefix", zap.String("prefix", prefix))
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
resp, err := kv.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithCountOnly(), clientv3.WithLimit(1))
|
||||
|
@ -171,7 +180,7 @@ func (kv *EmbedEtcdKV) HasPrefix(prefix string) (bool, error) {
|
|||
func (kv *EmbedEtcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
log.Debug("LoadBytesWithPrefix ", zap.String("prefix", key))
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
|
||||
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
|
||||
|
@ -191,7 +200,7 @@ func (kv *EmbedEtcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, erro
|
|||
func (kv *EmbedEtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64, error) {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
log.Debug("LoadBytesWithPrefix2 ", zap.String("prefix", key))
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
|
||||
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
|
||||
|
@ -212,7 +221,7 @@ func (kv *EmbedEtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []i
|
|||
// Load returns value of the given key
|
||||
func (kv *EmbedEtcdKV) Load(key string) (string, error) {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.client.Get(ctx, key)
|
||||
if err != nil {
|
||||
|
@ -228,7 +237,7 @@ func (kv *EmbedEtcdKV) Load(key string) (string, error) {
|
|||
// LoadBytes returns value of the given key
|
||||
func (kv *EmbedEtcdKV) LoadBytes(key string) ([]byte, error) {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.client.Get(ctx, key)
|
||||
if err != nil {
|
||||
|
@ -248,7 +257,7 @@ func (kv *EmbedEtcdKV) MultiLoad(keys []string) ([]string, error) {
|
|||
ops = append(ops, clientv3.OpGet(path.Join(kv.rootPath, keyLoad)))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
if err != nil {
|
||||
|
@ -284,7 +293,7 @@ func (kv *EmbedEtcdKV) MultiLoadBytes(keys []string) ([][]byte, error) {
|
|||
ops = append(ops, clientv3.OpGet(path.Join(kv.rootPath, keyLoad)))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
if err != nil {
|
||||
|
@ -317,7 +326,7 @@ func (kv *EmbedEtcdKV) MultiLoadBytes(keys []string) ([][]byte, error) {
|
|||
func (kv *EmbedEtcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, int64, error) {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
log.Debug("LoadBytesWithRevision ", zap.String("prefix", key))
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
|
||||
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
|
||||
|
@ -336,7 +345,7 @@ func (kv *EmbedEtcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, in
|
|||
// Save saves the key-value pair.
|
||||
func (kv *EmbedEtcdKV) Save(key, value string) error {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
_, err := kv.client.Put(ctx, key, value)
|
||||
return err
|
||||
|
@ -345,7 +354,7 @@ func (kv *EmbedEtcdKV) Save(key, value string) error {
|
|||
// SaveBytes saves the key-value pair.
|
||||
func (kv *EmbedEtcdKV) SaveBytes(key string, value []byte) error {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
_, err := kv.client.Put(ctx, key, string(value))
|
||||
return err
|
||||
|
@ -354,7 +363,7 @@ func (kv *EmbedEtcdKV) SaveBytes(key string, value []byte) error {
|
|||
// SaveBytesWithLease is a function to put value in etcd with etcd lease options.
|
||||
func (kv *EmbedEtcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.LeaseID) error {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
_, err := kv.client.Put(ctx, key, string(value), clientv3.WithLease(id))
|
||||
return err
|
||||
|
@ -367,7 +376,7 @@ func (kv *EmbedEtcdKV) MultiSave(kvs map[string]string) error {
|
|||
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
|
@ -381,7 +390,7 @@ func (kv *EmbedEtcdKV) MultiSaveBytes(kvs map[string][]byte) error {
|
|||
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value)))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
|
@ -391,7 +400,7 @@ func (kv *EmbedEtcdKV) MultiSaveBytes(kvs map[string][]byte) error {
|
|||
// RemoveWithPrefix removes the keys with given prefix.
|
||||
func (kv *EmbedEtcdKV) RemoveWithPrefix(prefix string) error {
|
||||
key := path.Join(kv.rootPath, prefix)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := kv.client.Delete(ctx, key, clientv3.WithPrefix())
|
||||
|
@ -401,7 +410,7 @@ func (kv *EmbedEtcdKV) RemoveWithPrefix(prefix string) error {
|
|||
// Remove removes the key.
|
||||
func (kv *EmbedEtcdKV) Remove(key string) error {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := kv.client.Delete(ctx, key)
|
||||
|
@ -415,7 +424,7 @@ func (kv *EmbedEtcdKV) MultiRemove(keys []string) error {
|
|||
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, key)))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
|
@ -438,7 +447,7 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemove(saves map[string]string, removals []st
|
|||
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete)))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
resp, err := kv.client.Txn(ctx).If(cmps...).Then(ops...).Commit()
|
||||
|
@ -463,7 +472,7 @@ func (kv *EmbedEtcdKV) MultiSaveBytesAndRemove(saves map[string][]byte, removals
|
|||
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete)))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
|
@ -504,7 +513,7 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, rem
|
|||
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete), clientv3.WithPrefix()))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
resp, err := kv.client.Txn(ctx).If(cmps...).Then(ops...).Commit()
|
||||
|
@ -529,7 +538,7 @@ func (kv *EmbedEtcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte
|
|||
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete), clientv3.WithPrefix()))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
|
@ -539,7 +548,7 @@ func (kv *EmbedEtcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte
|
|||
// CompareVersionAndSwap compares the existing key-value's version with version, and if
|
||||
// they are equal, the target is stored in etcd.
|
||||
func (kv *EmbedEtcdKV) CompareVersionAndSwap(key string, version int64, target string) (bool, error) {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.client.Txn(ctx).If(
|
||||
clientv3.Compare(
|
||||
|
@ -556,7 +565,7 @@ func (kv *EmbedEtcdKV) CompareVersionAndSwap(key string, version int64, target s
|
|||
// CompareVersionAndSwapBytes compares the existing key-value's version with version, and if
|
||||
// they are equal, the target is stored in etcd.
|
||||
func (kv *EmbedEtcdKV) CompareVersionAndSwapBytes(key string, version int64, target []byte, opts ...clientv3.OpOption) (bool, error) {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.client.Txn(ctx).If(
|
||||
clientv3.Compare(
|
||||
|
|
|
@ -34,21 +34,29 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// RequestTimeout is default timeout for etcd request.
|
||||
RequestTimeout = 10 * time.Second
|
||||
// defaultRequestTimeout is default timeout for etcd request.
|
||||
defaultRequestTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
// etcdKV implements TxnKV interface, it supports to process multiple kvs in a transaction.
|
||||
type etcdKV struct {
|
||||
client *clientv3.Client
|
||||
rootPath string
|
||||
|
||||
requestTimeout time.Duration
|
||||
}
|
||||
|
||||
// NewEtcdKV creates a new etcd kv.
|
||||
func NewEtcdKV(client *clientv3.Client, rootPath string) *etcdKV {
|
||||
func NewEtcdKV(client *clientv3.Client, rootPath string, options ...Option) *etcdKV {
|
||||
opt := defaultOption()
|
||||
for _, option := range options {
|
||||
option(opt)
|
||||
}
|
||||
kv := &etcdKV{
|
||||
client: client,
|
||||
rootPath: rootPath,
|
||||
|
||||
requestTimeout: opt.requestTimeout,
|
||||
}
|
||||
return kv
|
||||
}
|
||||
|
@ -76,7 +84,7 @@ func (kv *etcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]by
|
|||
|
||||
key := prefix
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.getEtcdMeta(ctx, key, opts...)
|
||||
if err != nil {
|
||||
|
@ -104,7 +112,7 @@ func (kv *etcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]by
|
|||
func (kv *etcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
|
||||
start := time.Now()
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.getEtcdMeta(ctx, key, clientv3.WithPrefix(),
|
||||
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
|
||||
|
@ -124,7 +132,7 @@ func (kv *etcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
|
|||
func (kv *etcdKV) Has(key string) (bool, error) {
|
||||
start := time.Now()
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
resp, err := kv.getEtcdMeta(ctx, key, clientv3.WithCountOnly())
|
||||
|
@ -139,7 +147,7 @@ func (kv *etcdKV) Has(key string) (bool, error) {
|
|||
func (kv *etcdKV) HasPrefix(prefix string) (bool, error) {
|
||||
start := time.Now()
|
||||
prefix = path.Join(kv.rootPath, prefix)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
resp, err := kv.getEtcdMeta(ctx, prefix, clientv3.WithPrefix(), clientv3.WithLimit(1), clientv3.WithCountOnly())
|
||||
|
@ -155,7 +163,7 @@ func (kv *etcdKV) HasPrefix(prefix string) (bool, error) {
|
|||
func (kv *etcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) {
|
||||
start := time.Now()
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.getEtcdMeta(ctx, key, clientv3.WithPrefix(),
|
||||
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
|
||||
|
@ -176,7 +184,7 @@ func (kv *etcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) {
|
|||
func (kv *etcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64, error) {
|
||||
start := time.Now()
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.getEtcdMeta(ctx, key, clientv3.WithPrefix(),
|
||||
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
|
||||
|
@ -199,7 +207,7 @@ func (kv *etcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64,
|
|||
func (kv *etcdKV) Load(key string) (string, error) {
|
||||
start := time.Now()
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.getEtcdMeta(ctx, key)
|
||||
if err != nil {
|
||||
|
@ -216,7 +224,7 @@ func (kv *etcdKV) Load(key string) (string, error) {
|
|||
func (kv *etcdKV) LoadBytes(key string) ([]byte, error) {
|
||||
start := time.Now()
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.getEtcdMeta(ctx, key)
|
||||
if err != nil {
|
||||
|
@ -237,7 +245,7 @@ func (kv *etcdKV) MultiLoad(keys []string) ([]string, error) {
|
|||
ops = append(ops, clientv3.OpGet(path.Join(kv.rootPath, keyLoad)))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...)
|
||||
if err != nil {
|
||||
|
@ -272,7 +280,7 @@ func (kv *etcdKV) MultiLoadBytes(keys []string) ([][]byte, error) {
|
|||
ops = append(ops, clientv3.OpGet(path.Join(kv.rootPath, keyLoad)))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...)
|
||||
if err != nil {
|
||||
|
@ -303,7 +311,7 @@ func (kv *etcdKV) MultiLoadBytes(keys []string) ([][]byte, error) {
|
|||
func (kv *etcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, int64, error) {
|
||||
start := time.Now()
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.getEtcdMeta(ctx, key, clientv3.WithPrefix(),
|
||||
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
|
||||
|
@ -324,7 +332,7 @@ func (kv *etcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, int64,
|
|||
func (kv *etcdKV) Save(key, value string) error {
|
||||
start := time.Now()
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
CheckValueSizeAndWarn(key, value)
|
||||
_, err := kv.putEtcdMeta(ctx, key, value)
|
||||
|
@ -336,7 +344,7 @@ func (kv *etcdKV) Save(key, value string) error {
|
|||
func (kv *etcdKV) SaveBytes(key string, value []byte) error {
|
||||
start := time.Now()
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
CheckValueSizeAndWarn(key, value)
|
||||
_, err := kv.putEtcdMeta(ctx, key, string(value))
|
||||
|
@ -348,7 +356,7 @@ func (kv *etcdKV) SaveBytes(key string, value []byte) error {
|
|||
func (kv *etcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.LeaseID) error {
|
||||
start := time.Now()
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
CheckValueSizeAndWarn(key, value)
|
||||
_, err := kv.putEtcdMeta(ctx, key, string(value), clientv3.WithLease(id))
|
||||
|
@ -366,7 +374,7 @@ func (kv *etcdKV) MultiSave(kvs map[string]string) error {
|
|||
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
CheckTnxStringValueSizeAndWarn(kvs)
|
||||
|
@ -388,7 +396,7 @@ func (kv *etcdKV) MultiSaveBytes(kvs map[string][]byte) error {
|
|||
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value)))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
CheckTnxBytesValueSizeAndWarn(kvs)
|
||||
|
@ -404,7 +412,7 @@ func (kv *etcdKV) MultiSaveBytes(kvs map[string][]byte) error {
|
|||
func (kv *etcdKV) RemoveWithPrefix(prefix string) error {
|
||||
start := time.Now()
|
||||
key := path.Join(kv.rootPath, prefix)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := kv.removeEtcdMeta(ctx, key, clientv3.WithPrefix())
|
||||
|
@ -416,7 +424,7 @@ func (kv *etcdKV) RemoveWithPrefix(prefix string) error {
|
|||
func (kv *etcdKV) Remove(key string) error {
|
||||
start := time.Now()
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := kv.removeEtcdMeta(ctx, key)
|
||||
|
@ -432,7 +440,7 @@ func (kv *etcdKV) MultiRemove(keys []string) error {
|
|||
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, key)))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...)
|
||||
|
@ -462,7 +470,7 @@ func (kv *etcdKV) MultiSaveAndRemove(saves map[string]string, removals []string,
|
|||
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete)))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
resp, err := kv.executeTxn(kv.getTxnWithCmp(ctx, cmps...), ops...)
|
||||
|
@ -497,7 +505,7 @@ func (kv *etcdKV) MultiSaveBytesAndRemove(saves map[string][]byte, removals []st
|
|||
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete)))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...)
|
||||
|
@ -559,7 +567,7 @@ func (kv *etcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals
|
|||
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete), clientv3.WithPrefix()))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
resp, err := kv.executeTxn(kv.getTxnWithCmp(ctx, cmps...), ops...)
|
||||
|
@ -593,7 +601,7 @@ func (kv *etcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte, rem
|
|||
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete), clientv3.WithPrefix()))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...)
|
||||
|
@ -613,7 +621,7 @@ func (kv *etcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte, rem
|
|||
// they are equal, the target is stored in etcd.
|
||||
func (kv *etcdKV) CompareVersionAndSwap(key string, source int64, target string) (bool, error) {
|
||||
start := time.Now()
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.executeTxn(kv.getTxnWithCmp(ctx,
|
||||
clientv3.Compare(clientv3.Version(path.Join(kv.rootPath, key)), "=", source)),
|
||||
|
@ -629,7 +637,7 @@ func (kv *etcdKV) CompareVersionAndSwap(key string, source int64, target string)
|
|||
// they are equal, the target is stored in etcd.
|
||||
func (kv *etcdKV) CompareVersionAndSwapBytes(key string, source int64, target []byte, opts ...clientv3.OpOption) (bool, error) {
|
||||
start := time.Now()
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
resp, err := kv.executeTxn(kv.getTxnWithCmp(ctx,
|
||||
clientv3.Compare(clientv3.Version(path.Join(kv.rootPath, key)), "=", source)),
|
||||
|
@ -680,7 +688,7 @@ func CheckTnxStringValueSizeAndWarn(kvs map[string]string) bool {
|
|||
}
|
||||
|
||||
func (kv *etcdKV) getEtcdMeta(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
|
||||
ctx1, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||
ctx1, cancel := context.WithTimeout(ctx, kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
start := timerecord.NewTimeRecorder("getEtcdMeta")
|
||||
|
@ -704,7 +712,7 @@ func (kv *etcdKV) getEtcdMeta(ctx context.Context, key string, opts ...clientv3.
|
|||
}
|
||||
|
||||
func (kv *etcdKV) putEtcdMeta(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
|
||||
ctx1, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||
ctx1, cancel := context.WithTimeout(ctx, kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
start := timerecord.NewTimeRecorder("putEtcdMeta")
|
||||
|
@ -723,7 +731,7 @@ func (kv *etcdKV) putEtcdMeta(ctx context.Context, key, val string, opts ...clie
|
|||
}
|
||||
|
||||
func (kv *etcdKV) removeEtcdMeta(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
|
||||
ctx1, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||
ctx1, cancel := context.WithTimeout(ctx, kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
start := timerecord.NewTimeRecorder("removeEtcdMeta")
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
package etcdkv
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/server/v3/embed"
|
||||
"go.uber.org/zap"
|
||||
|
||||
|
@ -45,7 +47,7 @@ func NewWatchKVFactory(rootPath string, etcdCfg *paramtable.EtcdConfig) (kv.Watc
|
|||
cfg = embed.NewConfig()
|
||||
}
|
||||
cfg.Dir = etcdCfg.DataDir.GetValue()
|
||||
watchKv, err := NewEmbededEtcdKV(cfg, rootPath)
|
||||
watchKv, err := NewEmbededEtcdKV(cfg, rootPath, WithRequestTimeout(etcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -62,7 +64,8 @@ func NewWatchKVFactory(rootPath string, etcdCfg *paramtable.EtcdConfig) (kv.Watc
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
watchKv := NewEtcdKV(client, rootPath)
|
||||
watchKv := NewEtcdKV(client, rootPath,
|
||||
WithRequestTimeout(etcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
|
||||
return watchKv, err
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package etcdkv
|
||||
|
||||
import "time"
|
||||
|
||||
type etcdOpt struct {
|
||||
requestTimeout time.Duration
|
||||
}
|
||||
|
||||
type Option func(*etcdOpt)
|
||||
|
||||
func WithRequestTimeout(timeout time.Duration) Option {
|
||||
return func(opt *etcdOpt) {
|
||||
opt.requestTimeout = timeout
|
||||
}
|
||||
}
|
||||
|
||||
func defaultOption() *etcdOpt {
|
||||
return &etcdOpt{
|
||||
requestTimeout: defaultRequestTimeout,
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tikv
|
||||
|
||||
import "time"
|
||||
|
||||
type tikvOpt struct {
|
||||
requestTimeout time.Duration
|
||||
}
|
||||
|
||||
type Option func(*tikvOpt)
|
||||
|
||||
func WithRequestTimeout(timeout time.Duration) Option {
|
||||
return func(opt *tikvOpt) {
|
||||
opt.requestTimeout = timeout
|
||||
}
|
||||
}
|
||||
|
||||
func defaultOption() *tikvOpt {
|
||||
return &tikvOpt{
|
||||
requestTimeout: defaultRequestTimeout,
|
||||
}
|
||||
}
|
|
@ -63,8 +63,10 @@ var Params *paramtable.ComponentParam = paramtable.Get()
|
|||
// For reads by prefix we can customize the scan size to increase/decrease rpc calls.
|
||||
var SnapshotScanSize int
|
||||
|
||||
// RequestTimeout is the default timeout for tikv request.
|
||||
var RequestTimeout time.Duration
|
||||
// defaultRequestTimeout is the default timeout for tikv request.
|
||||
const (
|
||||
defaultRequestTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
var EmptyValueByte = []byte(EmptyValueString)
|
||||
|
||||
|
@ -95,15 +97,23 @@ var _ kv.MetaKv = (*txnTiKV)(nil)
|
|||
type txnTiKV struct {
|
||||
txn *txnkv.Client
|
||||
rootPath string
|
||||
|
||||
requestTimeout time.Duration
|
||||
}
|
||||
|
||||
// NewTiKV creates a new txnTiKV client.
|
||||
func NewTiKV(txn *txnkv.Client, rootPath string) *txnTiKV {
|
||||
func NewTiKV(txn *txnkv.Client, rootPath string, options ...Option) *txnTiKV {
|
||||
SnapshotScanSize = Params.TiKVCfg.SnapshotScanSize.GetAsInt()
|
||||
RequestTimeout = Params.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond)
|
||||
|
||||
opt := defaultOption()
|
||||
for _, option := range options {
|
||||
option(opt)
|
||||
}
|
||||
|
||||
kv := &txnTiKV{
|
||||
txn: txn,
|
||||
rootPath: rootPath,
|
||||
txn: txn,
|
||||
rootPath: rootPath,
|
||||
requestTimeout: opt.requestTimeout,
|
||||
}
|
||||
return kv
|
||||
}
|
||||
|
@ -131,7 +141,7 @@ func logWarnOnFailure(err *error, msg string, fields ...zap.Field) {
|
|||
func (kv *txnTiKV) Has(key string) (bool, error) {
|
||||
start := time.Now()
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var loggingErr error
|
||||
|
@ -190,7 +200,7 @@ func (kv *txnTiKV) HasPrefix(prefix string) (bool, error) {
|
|||
func (kv *txnTiKV) Load(key string) (string, error) {
|
||||
start := time.Now()
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var loggingErr error
|
||||
|
@ -221,7 +231,7 @@ func batchConvertFromString(prefix string, keys []string) [][]byte {
|
|||
// MultiLoad gets the values of input keys in a transaction.
|
||||
func (kv *txnTiKV) MultiLoad(keys []string) ([]string, error) {
|
||||
start := time.Now()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var loggingErr error
|
||||
|
@ -303,7 +313,7 @@ func (kv *txnTiKV) LoadWithPrefix(prefix string) ([]string, []string, error) {
|
|||
// Save saves the input key-value pair.
|
||||
func (kv *txnTiKV) Save(key, value string) error {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var loggingErr error
|
||||
|
@ -316,7 +326,7 @@ func (kv *txnTiKV) Save(key, value string) error {
|
|||
// MultiSave saves the input key-value pairs in transaction manner.
|
||||
func (kv *txnTiKV) MultiSave(kvs map[string]string) error {
|
||||
start := time.Now()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var loggingErr error
|
||||
|
@ -358,7 +368,7 @@ func (kv *txnTiKV) MultiSave(kvs map[string]string) error {
|
|||
// Remove removes the input key.
|
||||
func (kv *txnTiKV) Remove(key string) error {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var loggingErr error
|
||||
|
@ -371,7 +381,7 @@ func (kv *txnTiKV) Remove(key string) error {
|
|||
// MultiRemove removes the input keys in transaction manner.
|
||||
func (kv *txnTiKV) MultiRemove(keys []string) error {
|
||||
start := time.Now()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var loggingErr error
|
||||
|
@ -408,7 +418,7 @@ func (kv *txnTiKV) MultiRemove(keys []string) error {
|
|||
func (kv *txnTiKV) RemoveWithPrefix(prefix string) error {
|
||||
start := time.Now()
|
||||
prefix = path.Join(kv.rootPath, prefix)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var loggingErr error
|
||||
|
@ -428,7 +438,7 @@ func (kv *txnTiKV) RemoveWithPrefix(prefix string) error {
|
|||
// MultiSaveAndRemove saves the key-value pairs and removes the keys in a transaction.
|
||||
func (kv *txnTiKV) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
|
||||
start := time.Now()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var loggingErr error
|
||||
|
@ -491,7 +501,7 @@ func (kv *txnTiKV) MultiSaveAndRemove(saves map[string]string, removals []string
|
|||
// MultiSaveAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals.
|
||||
func (kv *txnTiKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error {
|
||||
start := time.Now()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var loggingErr error
|
||||
|
@ -635,7 +645,7 @@ func (kv *txnTiKV) executeTxn(ctx context.Context, txn *transaction.KVTxn) error
|
|||
}
|
||||
|
||||
func (kv *txnTiKV) getTiKVMeta(ctx context.Context, key string) (string, error) {
|
||||
ctx1, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||
ctx1, cancel := context.WithTimeout(ctx, kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
start := timerecord.NewTimeRecorder("getTiKVMeta")
|
||||
|
@ -668,7 +678,7 @@ func (kv *txnTiKV) getTiKVMeta(ctx context.Context, key string) (string, error)
|
|||
}
|
||||
|
||||
func (kv *txnTiKV) putTiKVMeta(ctx context.Context, key, val string) error {
|
||||
ctx1, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||
ctx1, cancel := context.WithTimeout(ctx, kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
start := timerecord.NewTimeRecorder("putTiKVMeta")
|
||||
|
@ -705,7 +715,7 @@ func (kv *txnTiKV) putTiKVMeta(ctx context.Context, key, val string) error {
|
|||
}
|
||||
|
||||
func (kv *txnTiKV) removeTiKVMeta(ctx context.Context, key string) error {
|
||||
ctx1, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||
ctx1, cancel := context.WithTimeout(ctx, kv.requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
start := timerecord.NewTimeRecorder("removeTiKVMeta")
|
||||
|
|
|
@ -214,10 +214,12 @@ func (s *Server) initQueryCoord() error {
|
|||
var idAllocatorKV kv.TxnKV
|
||||
log.Info(fmt.Sprintf("query coordinator connecting to %s.", metaType))
|
||||
if metaType == util.MetaStoreTypeTiKV {
|
||||
s.kv = tikv.NewTiKV(s.tikvCli, Params.TiKVCfg.MetaRootPath.GetValue())
|
||||
s.kv = tikv.NewTiKV(s.tikvCli, Params.TiKVCfg.MetaRootPath.GetValue(),
|
||||
tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
|
||||
idAllocatorKV = tsoutil.NewTSOTiKVBase(s.tikvCli, Params.TiKVCfg.KvRootPath.GetValue(), "querycoord-id-allocator")
|
||||
} else if metaType == util.MetaStoreTypeEtcd {
|
||||
s.kv = etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
|
||||
s.kv = etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(),
|
||||
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
|
||||
idAllocatorKV = tsoutil.NewTSOKVBase(s.etcdCli, Params.EtcdCfg.KvRootPath.GetValue(), "querycoord-id-allocator")
|
||||
} else {
|
||||
return fmt.Errorf("not supported meta store: %s", metaType)
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"fmt"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
|
@ -29,7 +30,6 @@ import (
|
|||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
|
@ -78,7 +78,7 @@ func (p *proxyManager) DelSessionFunc(fns ...func(*sessionutil.Session)) {
|
|||
|
||||
// WatchProxy starts a goroutine to watch proxy session changes on etcd
|
||||
func (p *proxyManager) WatchProxy() error {
|
||||
ctx, cancel := context.WithTimeout(p.ctx, etcdkv.RequestTimeout)
|
||||
ctx, cancel := context.WithTimeout(p.ctx, Params.ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond))
|
||||
defer cancel()
|
||||
|
||||
sessions, rev, err := p.getSessionsOnEtcd(ctx)
|
||||
|
|
|
@ -321,11 +321,13 @@ func (c *Core) initKVCreator() {
|
|||
if c.metaKVCreator == nil {
|
||||
if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
|
||||
c.metaKVCreator = func() (kv.MetaKv, error) {
|
||||
return tikv.NewTiKV(c.tikvCli, Params.TiKVCfg.MetaRootPath.GetValue()), nil
|
||||
return tikv.NewTiKV(c.tikvCli, Params.TiKVCfg.MetaRootPath.GetValue(),
|
||||
tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond))), nil
|
||||
}
|
||||
} else {
|
||||
c.metaKVCreator = func() (kv.MetaKv, error) {
|
||||
return etcdkv.NewEtcdKV(c.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue()), nil
|
||||
return etcdkv.NewEtcdKV(c.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(),
|
||||
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond))), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -101,6 +101,7 @@ type EtcdConfig struct {
|
|||
EtcdTLSKey ParamItem `refreshable:"false"`
|
||||
EtcdTLSCACert ParamItem `refreshable:"false"`
|
||||
EtcdTLSMinVersion ParamItem `refreshable:"false"`
|
||||
RequestTimeout ParamItem `refreshable:"false"`
|
||||
|
||||
// --- Embed ETCD ---
|
||||
UseEmbedEtcd ParamItem `refreshable:"false"`
|
||||
|
@ -257,6 +258,15 @@ We recommend using version 1.2 and above.`,
|
|||
Export: true,
|
||||
}
|
||||
p.EtcdTLSMinVersion.Init(base.mgr)
|
||||
|
||||
p.RequestTimeout = ParamItem{
|
||||
Key: "etcd.requestTimeout",
|
||||
DefaultValue: "10000",
|
||||
Version: "2.3.4",
|
||||
Doc: `Etcd operation timeout in milliseconds`,
|
||||
Export: true,
|
||||
}
|
||||
p.RequestTimeout.Init(base.mgr)
|
||||
}
|
||||
|
||||
// /////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -268,7 +278,7 @@ type TiKVConfig struct {
|
|||
KvSubPath ParamItem `refreshable:"false"`
|
||||
MetaRootPath CompositeParamItem `refreshable:"false"`
|
||||
KvRootPath CompositeParamItem `refreshable:"false"`
|
||||
RequestTimeout ParamItem `refreshable:"true"`
|
||||
RequestTimeout ParamItem `refreshable:"false"`
|
||||
SnapshotScanSize ParamItem `refreshable:"true"`
|
||||
TiKVUseSSL ParamItem `refreshable:"false"`
|
||||
TiKVTLSCert ParamItem `refreshable:"false"`
|
||||
|
|
Loading…
Reference in New Issue