support metrics mutex to monitor cost of locks(#26102) (#26103)

Signed-off-by: MrPresent-Han <chun.han@zilliz.com>
pull/25802/head
MrPresent-Han 2023-08-03 15:03:06 +08:00 committed by GitHub
parent 68ecf49ed5
commit 47392b0a0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 227 additions and 0 deletions

View File

@ -489,6 +489,13 @@ common:
ImportMaxFileSize: 17179869184 # 16 * 1024 * 1024 * 1024
# max file size to import for bulkInsert
locks:
metrics:
enable: false
threshold:
info: 500 # minimum milliseconds for printing durations in info level
warn: 1000 # minimum milliseconds for printing durations in warn level
# QuotaConfig, configurations of Milvus quota and limits.
# By default, we enable:
# 1. TT protection;

View File

@ -84,6 +84,10 @@ const (
requestScope = "scope"
fullMethodLabelName = "full_method"
reduceLevelName = "reduce_level"
lockName = "lock_name"
lockSource = "lock_source"
lockType = "lock_type"
lockOp = "lock_op"
)
var (
@ -97,9 +101,22 @@ var (
Name: "num_node",
Help: "number of nodes and coordinates",
}, []string{nodeIDLabelName, roleNameLabelName})
LockCosts = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Name: "lock_time_cost",
Help: "time cost for various kinds of locks",
}, []string{
lockName,
lockSource,
lockType,
lockOp,
})
)
// Register serves prometheus http service
func Register(r *prometheus.Registry) {
r.MustRegister(NumNodes)
r.MustRegister(LockCosts)
}

View File

@ -0,0 +1,102 @@
package lock
import (
"sync"
"time"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"go.uber.org/zap"
)
type MetricsLockManager struct {
rwLocks map[string]*MetricsRWMutex
}
type MetricsRWMutex struct {
mutex sync.RWMutex
lockName string
acquireTimeMap map[string]time.Time
}
const (
readLock = "READ_LOCK"
writeLock = "WRITE_LOCK"
hold = "HOLD"
acquire = "ACQUIRE"
)
func (mRWLock *MetricsRWMutex) RLock(source string) {
if paramtable.Get().CommonCfg.EnableLockMetrics.GetAsBool() {
before := time.Now()
mRWLock.mutex.RLock()
mRWLock.acquireTimeMap[source] = time.Now()
logLock(time.Since(before), mRWLock.lockName, source, readLock, acquire)
} else {
mRWLock.mutex.RLock()
}
}
func (mRWLock *MetricsRWMutex) Lock(source string) {
if paramtable.Get().CommonCfg.EnableLockMetrics.GetAsBool() {
before := time.Now()
mRWLock.mutex.Lock()
mRWLock.acquireTimeMap[source] = time.Now()
logLock(time.Since(before), mRWLock.lockName, source, writeLock, acquire)
} else {
mRWLock.mutex.Lock()
}
}
func (mRWLock *MetricsRWMutex) UnLock(source string) {
if mRWLock.maybeLogUnlockDuration(source, writeLock) != nil {
return
}
mRWLock.mutex.Unlock()
}
func (mRWLock *MetricsRWMutex) RUnLock(source string) {
if mRWLock.maybeLogUnlockDuration(source, readLock) != nil {
return
}
mRWLock.mutex.RUnlock()
}
func (mRWLock *MetricsRWMutex) maybeLogUnlockDuration(source string, lockType string) error {
if paramtable.Get().CommonCfg.EnableLockMetrics.GetAsBool() {
acquireTime, ok := mRWLock.acquireTimeMap[source]
if ok {
logLock(time.Since(acquireTime), mRWLock.lockName, source, lockType, hold)
delete(mRWLock.acquireTimeMap, source)
} else {
log.Error("there's no lock history for the source, there may be some defects in codes",
zap.String("source", source))
return errors.New("unknown source")
}
}
return nil
}
func logLock(duration time.Duration, lockName string, source string, lockType string, opType string) {
if duration >= paramtable.Get().CommonCfg.LockSlowLogWarnThreshold.GetAsDuration(time.Millisecond) {
log.Warn("lock takes too long", zap.String("lockName", lockName), zap.String("lockType", lockType),
zap.String("source", source), zap.String("opType", opType),
zap.Duration("time_cost", duration))
} else if duration >= paramtable.Get().CommonCfg.LockSlowLogInfoThreshold.GetAsDuration(time.Millisecond) {
log.Info("lock takes too long", zap.String("lockName", lockName), zap.String("lockType", lockType),
zap.String("source", source), zap.String("opType", opType),
zap.Duration("time_cost", duration))
}
metrics.LockCosts.WithLabelValues(lockName, source, lockType, opType).Set(float64(duration.Milliseconds()))
}
// currently, we keep metricsLockManager as a communal gate for metrics lock
// we may use this manager as a centralized statistical site to provide overall cost for locks
func (mlManager *MetricsLockManager) applyRWLock(name string) *MetricsRWMutex {
return &MetricsRWMutex{
lockName: name,
acquireTimeMap: make(map[string]time.Time, 0),
}
}

View File

@ -0,0 +1,69 @@
package lock
import (
"sync"
"testing"
"time"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/stretchr/testify/assert"
)
func TestMetricsLockLock(t *testing.T) {
lManager := &MetricsLockManager{
rwLocks: make(map[string]*MetricsRWMutex, 0),
}
params.Params.Init()
params.Params.Save(params.Params.CommonCfg.EnableLockMetrics.Key, "true")
params.Params.Save(params.Params.CommonCfg.LockSlowLogInfoThreshold.Key, "10")
lName := "testLock"
lockDuration := 10 * time.Millisecond
testRWLock := lManager.applyRWLock(lName)
wg := sync.WaitGroup{}
testRWLock.Lock("main_thread")
go func() {
wg.Add(1)
before := time.Now()
testRWLock.Lock("sub_thread")
lkDuration := time.Since(before)
assert.True(t, lkDuration >= lockDuration)
testRWLock.UnLock("sub_threadXX")
testRWLock.UnLock("sub_thread")
wg.Done()
}()
time.Sleep(lockDuration)
testRWLock.UnLock("main_thread")
wg.Wait()
}
func TestMetricsLockRLock(t *testing.T) {
lManager := &MetricsLockManager{
rwLocks: make(map[string]*MetricsRWMutex, 0),
}
params.Params.Init()
params.Params.Save(params.Params.CommonCfg.EnableLockMetrics.Key, "true")
params.Params.Save(params.Params.CommonCfg.LockSlowLogWarnThreshold.Key, "10")
lName := "testLock"
lockDuration := 10 * time.Millisecond
testRWLock := lManager.applyRWLock(lName)
wg := sync.WaitGroup{}
testRWLock.RLock("main_thread")
go func() {
wg.Add(1)
before := time.Now()
testRWLock.Lock("sub_thread")
lkDuration := time.Since(before)
assert.True(t, lkDuration >= lockDuration)
testRWLock.UnLock("sub_thread")
wg.Done()
}()
time.Sleep(lockDuration)
assert.Equal(t, 1, len(testRWLock.acquireTimeMap))
testRWLock.RUnLock("main_threadXXX")
assert.Equal(t, 1, len(testRWLock.acquireTimeMap))
testRWLock.RUnLock("main_thread")
wg.Wait()
assert.Equal(t, 0, len(testRWLock.acquireTimeMap))
}

View File

@ -228,6 +228,11 @@ type commonConfig struct {
ImportMaxFileSize ParamItem `refreshable:"true"`
MetricsPort ParamItem `refreshable:"false"`
//lock related params
EnableLockMetrics ParamItem `refreshable:"false"`
LockSlowLogInfoThreshold ParamItem `refreshable:"true"`
LockSlowLogWarnThreshold ParamItem `refreshable:"true"`
}
func (p *commonConfig) init(base *BaseTable) {
@ -679,6 +684,33 @@ like the old password verification when updating the credential`,
DefaultValue: "9091",
}
p.MetricsPort.Init(base.mgr)
p.EnableLockMetrics = ParamItem{
Key: "common.locks.metrics.enable",
Version: "2.0.0",
DefaultValue: "false",
Doc: "whether gather statistics for metrics locks",
Export: true,
}
p.EnableLockMetrics.Init(base.mgr)
p.LockSlowLogInfoThreshold = ParamItem{
Key: "common.locks.threshold.info",
Version: "2.0.0",
DefaultValue: "500",
Doc: "minimum milliseconds for printing durations in info level",
Export: true,
}
p.LockSlowLogInfoThreshold.Init(base.mgr)
p.LockSlowLogWarnThreshold = ParamItem{
Key: "common.locks.threshold.warn",
Version: "2.0.0",
DefaultValue: "1000",
Doc: "minimum milliseconds for printing durations in warn level",
Export: true,
}
p.LockSlowLogWarnThreshold.Init(base.mgr)
}
type traceConfig struct {