mirror of https://github.com/milvus-io/milvus.git
fix: [10kcp] Fix index meta mutex contention (#38777)
issue: https://github.com/milvus-io/milvus/issues/37630 Reduce the frequency of updateIndexTasksMetrics to avoid holding the mutex for long periods. pr: https://github.com/milvus-io/milvus/pull/38775 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/38794/head
parent
1969ab3da7
commit
7f5467577e
|
@ -21,9 +21,11 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
|
@ -55,6 +57,8 @@ type indexMeta struct {
|
|||
|
||||
// segmentID -> indexID -> segmentIndex
|
||||
segmentIndexes map[UniqueID]map[UniqueID]*model.SegmentIndex
|
||||
|
||||
lastUpdateMetricTime atomic.Time
|
||||
}
|
||||
|
||||
// NewMeta creates meta from provided `kv.TxnKV`
|
||||
|
@ -138,6 +142,11 @@ func (m *indexMeta) updateSegIndexMeta(segIdx *model.SegmentIndex, updateFunc fu
|
|||
}
|
||||
|
||||
func (m *indexMeta) updateIndexTasksMetrics() {
|
||||
if time.Since(m.lastUpdateMetricTime.Load()) < 120*time.Second {
|
||||
return
|
||||
}
|
||||
defer m.lastUpdateMetricTime.Store(time.Now())
|
||||
start := time.Now()
|
||||
taskMetrics := make(map[UniqueID]map[commonpb.IndexState]int)
|
||||
for _, segIdx := range m.buildID2SegmentIndex {
|
||||
if segIdx.IsDeleted {
|
||||
|
@ -166,6 +175,7 @@ func (m *indexMeta) updateIndexTasksMetrics() {
|
|||
}
|
||||
}
|
||||
}
|
||||
log.Ctx(m.ctx).Info("update index metric", zap.Int("collectionNum", len(taskMetrics)), zap.Duration("dur", time.Since(start)))
|
||||
}
|
||||
|
||||
func checkParams(fieldIndex *model.Index, req *indexpb.CreateIndexRequest) bool {
|
||||
|
@ -782,7 +792,7 @@ func (m *indexMeta) GetAllSegIndexes() map[int64]*model.SegmentIndex {
|
|||
|
||||
segIndexes := make(map[int64]*model.SegmentIndex, len(m.buildID2SegmentIndex))
|
||||
for buildID, segIndex := range m.buildID2SegmentIndex {
|
||||
segIndexes[buildID] = model.CloneSegmentIndex(segIndex)
|
||||
segIndexes[buildID] = segIndex
|
||||
}
|
||||
return segIndexes
|
||||
}
|
||||
|
@ -879,22 +889,6 @@ func (m *indexMeta) CheckCleanSegmentIndex(buildID UniqueID) (bool, *model.Segme
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func (m *indexMeta) GetMetasByNodeID(nodeID UniqueID) []*model.SegmentIndex {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
metas := make([]*model.SegmentIndex, 0)
|
||||
for _, segIndex := range m.buildID2SegmentIndex {
|
||||
if segIndex.IsDeleted {
|
||||
continue
|
||||
}
|
||||
if nodeID == segIndex.NodeID {
|
||||
metas = append(metas, model.CloneSegmentIndex(segIndex))
|
||||
}
|
||||
}
|
||||
return metas
|
||||
}
|
||||
|
||||
func (m *indexMeta) getSegmentsIndexStates(collectionID UniqueID, segmentIDs []UniqueID) map[int64]map[int64]*indexpb.SegmentIndexState {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
|
Loading…
Reference in New Issue