mirror of https://github.com/milvus-io/milvus.git
Improve slow Etcd logs by specifying keys (#18547)
/kind improvement Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>pull/18575/head
parent
12023e8569
commit
097f144db1
|
@ -75,7 +75,7 @@ func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
|
|||
keys = append(keys, string(kv.Key))
|
||||
values = append(values, string(kv.Value))
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load with prefix")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load with prefix", zap.Strings("keys", keys))
|
||||
return keys, values, nil
|
||||
}
|
||||
|
||||
|
@ -96,7 +96,7 @@ func (kv *EtcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) {
|
|||
keys = append(keys, string(kv.Key))
|
||||
values = append(values, kv.Value)
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load with prefix")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load with prefix", zap.Strings("keys", keys))
|
||||
return keys, values, nil
|
||||
}
|
||||
|
||||
|
@ -119,7 +119,7 @@ func (kv *EtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64, erro
|
|||
values = append(values, string(kv.Value))
|
||||
versions = append(versions, kv.Version)
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load with prefix2")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load with prefix2", zap.Strings("keys", keys))
|
||||
return keys, values, versions, nil
|
||||
}
|
||||
|
||||
|
@ -141,7 +141,7 @@ func (kv *EtcdKV) LoadWithRevisionAndVersions(key string) ([]string, []string, [
|
|||
values = append(values, string(kv.Value))
|
||||
versions = append(versions, kv.Version)
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load with prefix2")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load with prefix2", zap.Strings("keys", keys))
|
||||
return keys, values, versions, resp.Header.Revision, nil
|
||||
}
|
||||
|
||||
|
@ -164,7 +164,7 @@ func (kv *EtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64,
|
|||
values = append(values, kv.Value)
|
||||
versions = append(versions, kv.Version)
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load with prefix2")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load with prefix2", zap.Strings("keys", keys))
|
||||
return keys, values, versions, nil
|
||||
}
|
||||
|
||||
|
@ -181,7 +181,7 @@ func (kv *EtcdKV) Load(key string) (string, error) {
|
|||
if resp.Count <= 0 {
|
||||
return "", fmt.Errorf("there is no value on key = %s", key)
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load", zap.String("key", key))
|
||||
return string(resp.Kvs[0].Value), nil
|
||||
}
|
||||
|
||||
|
@ -198,7 +198,7 @@ func (kv *EtcdKV) LoadBytes(key string) ([]byte, error) {
|
|||
if resp.Count <= 0 {
|
||||
return []byte{}, fmt.Errorf("there is no value on key = %s", key)
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load", zap.String("key", key))
|
||||
return resp.Kvs[0].Value, nil
|
||||
}
|
||||
|
||||
|
@ -233,7 +233,7 @@ func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error) {
|
|||
err = fmt.Errorf("there are invalid keys: %s", invalid)
|
||||
return result, err
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi load")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi load", zap.Any("keys", keys))
|
||||
return result, nil
|
||||
}
|
||||
|
||||
|
@ -268,7 +268,7 @@ func (kv *EtcdKV) MultiLoadBytes(keys []string) ([][]byte, error) {
|
|||
err = fmt.Errorf("there are invalid keys: %s", invalid)
|
||||
return result, err
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi load")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi load", zap.Strings("keys", keys))
|
||||
return result, nil
|
||||
}
|
||||
|
||||
|
@ -289,7 +289,7 @@ func (kv *EtcdKV) LoadWithRevision(key string) ([]string, []string, int64, error
|
|||
keys = append(keys, string(kv.Key))
|
||||
values = append(values, string(kv.Value))
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load with revision")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load with revision", zap.Strings("keys", keys))
|
||||
return keys, values, resp.Header.Revision, nil
|
||||
}
|
||||
|
||||
|
@ -310,7 +310,7 @@ func (kv *EtcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, int64,
|
|||
keys = append(keys, string(kv.Key))
|
||||
values = append(values, kv.Value)
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load with revision")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation load with revision", zap.Strings("keys", keys))
|
||||
return keys, values, resp.Header.Revision, nil
|
||||
}
|
||||
|
||||
|
@ -321,7 +321,7 @@ func (kv *EtcdKV) Save(key, value string) error {
|
|||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
defer cancel()
|
||||
_, err := kv.client.Put(ctx, key, value)
|
||||
CheckElapseAndWarn(start, "Slow etcd operation save")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation save", zap.String("key", key))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -332,7 +332,7 @@ func (kv *EtcdKV) SaveBytes(key string, value []byte) error {
|
|||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
defer cancel()
|
||||
_, err := kv.client.Put(ctx, key, string(value))
|
||||
CheckElapseAndWarn(start, "Slow etcd operation save")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation save", zap.String("key", key))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -344,7 +344,7 @@ func (kv *EtcdKV) SaveWithLease(key, value string, id clientv3.LeaseID) error {
|
|||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
defer cancel()
|
||||
_, err := kv.client.Put(ctx, key, value, clientv3.WithLease(id))
|
||||
CheckElapseAndWarn(start, "Slow etcd operation save with lease")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation save with lease", zap.String("key", key))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -356,7 +356,7 @@ func (kv *EtcdKV) SaveWithIgnoreLease(key, value string) error {
|
|||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
defer cancel()
|
||||
_, err := kv.client.Put(ctx, key, value, clientv3.WithIgnoreLease())
|
||||
CheckElapseAndWarn(start, "Slow etcd operation save with lease")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation save with lease", zap.String("key", key))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -367,7 +367,7 @@ func (kv *EtcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.Lease
|
|||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
defer cancel()
|
||||
_, err := kv.client.Put(ctx, key, string(value), clientv3.WithLease(id))
|
||||
CheckElapseAndWarn(start, "Slow etcd operation save with lease")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation save with lease", zap.String("key", key))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -375,7 +375,9 @@ func (kv *EtcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.Lease
|
|||
func (kv *EtcdKV) MultiSave(kvs map[string]string) error {
|
||||
start := time.Now()
|
||||
ops := make([]clientv3.Op, 0, len(kvs))
|
||||
var keys []string
|
||||
for key, value := range kvs {
|
||||
keys = append(keys, key)
|
||||
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value))
|
||||
}
|
||||
|
||||
|
@ -383,7 +385,7 @@ func (kv *EtcdKV) MultiSave(kvs map[string]string) error {
|
|||
defer cancel()
|
||||
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi save")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi save", zap.Strings("keys", keys))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -391,7 +393,9 @@ func (kv *EtcdKV) MultiSave(kvs map[string]string) error {
|
|||
func (kv *EtcdKV) MultiSaveBytes(kvs map[string][]byte) error {
|
||||
start := time.Now()
|
||||
ops := make([]clientv3.Op, 0, len(kvs))
|
||||
var keys []string
|
||||
for key, value := range kvs {
|
||||
keys = append(keys, key)
|
||||
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value)))
|
||||
}
|
||||
|
||||
|
@ -399,7 +403,7 @@ func (kv *EtcdKV) MultiSaveBytes(kvs map[string][]byte) error {
|
|||
defer cancel()
|
||||
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi save")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi save", zap.Strings("keys", keys))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -411,7 +415,7 @@ func (kv *EtcdKV) RemoveWithPrefix(prefix string) error {
|
|||
defer cancel()
|
||||
|
||||
_, err := kv.client.Delete(ctx, key, clientv3.WithPrefix())
|
||||
CheckElapseAndWarn(start, "Slow etcd operation remove with prefix")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation remove with prefix", zap.String("prefix", prefix))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -423,7 +427,7 @@ func (kv *EtcdKV) Remove(key string) error {
|
|||
defer cancel()
|
||||
|
||||
_, err := kv.client.Delete(ctx, key)
|
||||
CheckElapseAndWarn(start, "Slow etcd operation remove")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation remove", zap.String("key", key))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -439,7 +443,7 @@ func (kv *EtcdKV) MultiRemove(keys []string) error {
|
|||
defer cancel()
|
||||
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi remove")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi remove", zap.Strings("keys", keys))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -447,7 +451,9 @@ func (kv *EtcdKV) MultiRemove(keys []string) error {
|
|||
func (kv *EtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) error {
|
||||
start := time.Now()
|
||||
ops := make([]clientv3.Op, 0, len(saves)+len(removals))
|
||||
var keys []string
|
||||
for key, value := range saves {
|
||||
keys = append(keys, key)
|
||||
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value))
|
||||
}
|
||||
|
||||
|
@ -459,7 +465,7 @@ func (kv *EtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string)
|
|||
defer cancel()
|
||||
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi save and remove")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi save and remove", zap.Strings("keys", keys))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -467,7 +473,9 @@ func (kv *EtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string)
|
|||
func (kv *EtcdKV) MultiSaveBytesAndRemove(saves map[string][]byte, removals []string) error {
|
||||
start := time.Now()
|
||||
ops := make([]clientv3.Op, 0, len(saves)+len(removals))
|
||||
var keys []string
|
||||
for key, value := range saves {
|
||||
keys = append(keys, key)
|
||||
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value)))
|
||||
}
|
||||
|
||||
|
@ -479,7 +487,7 @@ func (kv *EtcdKV) MultiSaveBytesAndRemove(saves map[string][]byte, removals []st
|
|||
defer cancel()
|
||||
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi save and remove")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi save and remove", zap.Strings("keys", keys))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -488,7 +496,7 @@ func (kv *EtcdKV) Watch(key string) clientv3.WatchChan {
|
|||
start := time.Now()
|
||||
key = path.Join(kv.rootPath, key)
|
||||
rch := kv.client.Watch(context.Background(), key, clientv3.WithCreatedNotify())
|
||||
CheckElapseAndWarn(start, "Slow etcd operation watch")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation watch", zap.String("key", key))
|
||||
return rch
|
||||
}
|
||||
|
||||
|
@ -497,7 +505,7 @@ func (kv *EtcdKV) WatchWithPrefix(key string) clientv3.WatchChan {
|
|||
start := time.Now()
|
||||
key = path.Join(kv.rootPath, key)
|
||||
rch := kv.client.Watch(context.Background(), key, clientv3.WithPrefix(), clientv3.WithCreatedNotify())
|
||||
CheckElapseAndWarn(start, "Slow etcd operation watch with prefix")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation watch with prefix", zap.String("key", key))
|
||||
return rch
|
||||
}
|
||||
|
||||
|
@ -506,7 +514,7 @@ func (kv *EtcdKV) WatchWithRevision(key string, revision int64) clientv3.WatchCh
|
|||
start := time.Now()
|
||||
key = path.Join(kv.rootPath, key)
|
||||
rch := kv.client.Watch(context.Background(), key, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision))
|
||||
CheckElapseAndWarn(start, "Slow etcd operation watch with revision")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation watch with revision", zap.String("key", key))
|
||||
return rch
|
||||
}
|
||||
|
||||
|
@ -522,7 +530,7 @@ func (kv *EtcdKV) MultiRemoveWithPrefix(keys []string) error {
|
|||
defer cancel()
|
||||
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi remove with prefix")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi remove with prefix", zap.Strings("keys", keys))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -530,7 +538,9 @@ func (kv *EtcdKV) MultiRemoveWithPrefix(keys []string) error {
|
|||
func (kv *EtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
|
||||
start := time.Now()
|
||||
ops := make([]clientv3.Op, 0, len(saves))
|
||||
var keys []string
|
||||
for key, value := range saves {
|
||||
keys = append(keys, key)
|
||||
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value))
|
||||
}
|
||||
|
||||
|
@ -542,7 +552,7 @@ func (kv *EtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals
|
|||
defer cancel()
|
||||
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi save and move with prefix")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi save and move with prefix", zap.Strings("keys", keys))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -550,7 +560,9 @@ func (kv *EtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals
|
|||
func (kv *EtcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte, removals []string) error {
|
||||
start := time.Now()
|
||||
ops := make([]clientv3.Op, 0, len(saves))
|
||||
var keys []string
|
||||
for key, value := range saves {
|
||||
keys = append(keys, key)
|
||||
ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value)))
|
||||
}
|
||||
|
||||
|
@ -562,7 +574,7 @@ func (kv *EtcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte, rem
|
|||
defer cancel()
|
||||
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi save and move with prefix")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi save and move with prefix", zap.Strings("keys", keys))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -601,7 +613,7 @@ func (kv *EtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv
|
|||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation compare value and swap")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation compare value and swap", zap.String("key", key))
|
||||
return resp.Succeeded, nil
|
||||
}
|
||||
|
||||
|
@ -620,7 +632,7 @@ func (kv *EtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opt
|
|||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation compare value and swap")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation compare value and swap", zap.String("key", key))
|
||||
return resp.Succeeded, nil
|
||||
}
|
||||
|
||||
|
@ -639,7 +651,7 @@ func (kv *EtcdKV) CompareVersionAndSwap(key string, source int64, target string,
|
|||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation compare version and swap")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation compare version and swap", zap.String("key", key))
|
||||
return resp.Succeeded, nil
|
||||
}
|
||||
|
||||
|
@ -658,15 +670,15 @@ func (kv *EtcdKV) CompareVersionAndSwapBytes(key string, source int64, target []
|
|||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow etcd operation compare version and swap")
|
||||
CheckElapseAndWarn(start, "Slow etcd operation compare version and swap", zap.String("key", key))
|
||||
return resp.Succeeded, nil
|
||||
}
|
||||
|
||||
// CheckElapseAndWarn checks the elapsed time and warns if it is too long.
|
||||
func CheckElapseAndWarn(start time.Time, message string) bool {
|
||||
func CheckElapseAndWarn(start time.Time, message string, fields ...zap.Field) bool {
|
||||
elapsed := time.Since(start)
|
||||
if elapsed.Milliseconds() > 2000 {
|
||||
log.Warn(message, zap.String("time spent", elapsed.String()))
|
||||
log.Warn(message, append([]zap.Field{zap.String("time spent", elapsed.String())}, fields...)...)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
|
Loading…
Reference in New Issue