enhance: [2.5] Remove mutex from SegmentManger in querynode (#39051) (#39282)

Remove mutex from SegmentManger in querynode to prevent mutex
contention.

issue: https://github.com/milvus-io/milvus/issues/37630

pr: https://github.com/milvus-io/milvus/pull/39051

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/38900/head^2
yihao.dai 2025-01-16 13:27:02 +08:00 committed by GitHub
parent c741b8be2b
commit c945efa853
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 55 additions and 89 deletions

View File

@ -27,7 +27,6 @@ import "C"
import (
"context"
"fmt"
"sync"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
@ -40,6 +39,7 @@ import (
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/cache"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -202,55 +202,59 @@ type SegmentManager interface {
var _ SegmentManager = (*segmentManager)(nil)
type secondarySegmentIndex struct {
shardSegments map[metautil.Channel]segments
keyLock *lock.KeyLock[metautil.Channel]
shardSegments *typeutil.ConcurrentMap[metautil.Channel, segments]
}
func newSecondarySegmentIndex() secondarySegmentIndex {
return secondarySegmentIndex{
shardSegments: make(map[metautil.Channel]segments),
keyLock: lock.NewKeyLock[metautil.Channel](),
shardSegments: typeutil.NewConcurrentMap[metautil.Channel, segments](),
}
}
func (si secondarySegmentIndex) Put(ctx context.Context, segmentType SegmentType, segment Segment) {
shard := segment.Shard()
segments, ok := si.shardSegments[shard]
if !ok {
segments = newSegments()
si.shardSegments[shard] = segments
}
si.keyLock.Lock(shard)
defer si.keyLock.Unlock(shard)
segments, _ := si.shardSegments.GetOrInsert(shard, newSegments())
segments.Put(ctx, segmentType, segment)
}
func (si secondarySegmentIndex) Remove(s Segment) {
shard := s.Shard()
segments, ok := si.shardSegments[shard]
si.keyLock.Lock(shard)
defer si.keyLock.Unlock(shard)
segments, ok := si.shardSegments.Get(shard)
if !ok {
return
}
segments.Remove(s)
if segments.Empty() {
delete(si.shardSegments, shard)
si.shardSegments.Remove(shard)
}
}
type segments struct {
growingSegments map[typeutil.UniqueID]Segment
sealedSegments map[typeutil.UniqueID]Segment
growingSegments *typeutil.ConcurrentMap[typeutil.UniqueID, Segment]
sealedSegments *typeutil.ConcurrentMap[typeutil.UniqueID, Segment]
}
func (segments segments) Put(_ context.Context, segmentType SegmentType, segment Segment) {
switch segmentType {
case SegmentTypeGrowing:
segments.growingSegments[segment.ID()] = segment
segments.growingSegments.Insert(segment.ID(), segment)
case SegmentTypeSealed:
segments.sealedSegments[segment.ID()] = segment
segments.sealedSegments.Insert(segment.ID(), segment)
}
}
func (segments segments) Get(segmentID int64) (Segment, bool) {
if segment, ok := segments.growingSegments[segmentID]; ok {
if segment, ok := segments.growingSegments.Get(segmentID); ok {
return segment, true
} else if segment, ok = segments.sealedSegments[segmentID]; ok {
} else if segment, ok = segments.sealedSegments.Get(segmentID); ok {
return segment, true
}
@ -263,9 +267,9 @@ func (segments segments) GetWithType(segmentID int64, segmentType SegmentType) (
var ok bool
switch segmentType {
case SegmentTypeGrowing:
segment, ok = segments.growingSegments[segmentID]
segment, ok = segments.growingSegments.Get(segmentID)
case SegmentTypeSealed:
segment, ok = segments.sealedSegments[segmentID]
segment, ok = segments.sealedSegments.Get(segmentID)
}
return segment, ok
}
@ -275,11 +279,9 @@ func (segments segments) RemoveWithType(segmentID int64, segmentType SegmentType
var ok bool
switch segmentType {
case SegmentTypeGrowing:
segment, ok = segments.growingSegments[segmentID]
delete(segments.growingSegments, segmentID)
segment, ok = segments.growingSegments.GetAndRemove(segmentID)
case SegmentTypeSealed:
segment, ok = segments.sealedSegments[segmentID]
delete(segments.sealedSegments, segmentID)
segment, ok = segments.sealedSegments.GetAndRemove(segmentID)
}
return segment, ok
}
@ -287,9 +289,9 @@ func (segments segments) RemoveWithType(segmentID int64, segmentType SegmentType
func (segments segments) Remove(segment Segment) {
switch segment.Type() {
case SegmentTypeGrowing:
delete(segments.growingSegments, segment.ID())
segments.growingSegments.Remove(segment.ID())
case SegmentTypeSealed:
delete(segments.sealedSegments, segment.ID())
segments.sealedSegments.Remove(segment.ID())
}
}
@ -313,58 +315,57 @@ func (segments segments) RangeWithFilter(criterion *segmentCriterion, process fu
return
}
var candidates []map[int64]Segment
var candidates []*typeutil.ConcurrentMap[typeutil.UniqueID, Segment]
switch criterion.segmentType {
case SegmentTypeGrowing:
candidates = []map[int64]Segment{segments.growingSegments}
candidates = []*typeutil.ConcurrentMap[typeutil.UniqueID, Segment]{segments.growingSegments}
case SegmentTypeSealed:
candidates = []map[int64]Segment{segments.sealedSegments}
candidates = []*typeutil.ConcurrentMap[typeutil.UniqueID, Segment]{segments.sealedSegments}
default:
candidates = []map[int64]Segment{segments.growingSegments, segments.sealedSegments}
candidates = []*typeutil.ConcurrentMap[typeutil.UniqueID, Segment]{segments.growingSegments, segments.sealedSegments}
}
for _, candidate := range candidates {
for id, segment := range candidate {
candidate.Range(func(id typeutil.UniqueID, segment Segment) bool {
if criterion.Match(segment) {
if !process(id, segment.Type(), segment) {
return
return false
}
}
}
return true
})
}
}
func (segments segments) Empty() bool {
return len(segments.growingSegments) == 0 && len(segments.sealedSegments) == 0
return segments.growingSegments.Len() == 0 && segments.sealedSegments.Len() == 0
}
func newSegments() segments {
return segments{
growingSegments: make(map[int64]Segment),
sealedSegments: make(map[int64]Segment),
growingSegments: typeutil.NewConcurrentMap[typeutil.UniqueID, Segment](),
sealedSegments: typeutil.NewConcurrentMap[typeutil.UniqueID, Segment](),
}
}
// Manager manages all collections and segments
type segmentManager struct {
mu sync.RWMutex // guards all
globalSegments segments
secondaryIndex secondarySegmentIndex
// releaseCallback is the callback function when a segment is released.
releaseCallback func(s Segment)
growingOnReleasingSegments typeutil.UniqueSet
sealedOnReleasingSegments typeutil.UniqueSet
growingOnReleasingSegments *typeutil.ConcurrentSet[int64]
sealedOnReleasingSegments *typeutil.ConcurrentSet[int64]
}
func NewSegmentManager() *segmentManager {
return &segmentManager{
globalSegments: newSegments(),
secondaryIndex: newSecondarySegmentIndex(),
growingOnReleasingSegments: typeutil.NewUniqueSet(),
sealedOnReleasingSegments: typeutil.NewUniqueSet(),
growingOnReleasingSegments: typeutil.NewConcurrentSet[int64](),
sealedOnReleasingSegments: typeutil.NewConcurrentSet[int64](),
}
}
@ -376,8 +377,6 @@ func (mgr *segmentManager) put(ctx context.Context, segmentType SegmentType, seg
func (mgr *segmentManager) Put(ctx context.Context, segmentType SegmentType, segments ...Segment) {
var replacedSegment []Segment
mgr.mu.Lock()
defer mgr.mu.Unlock()
log := log.Ctx(ctx)
for _, segment := range segments {
@ -420,9 +419,6 @@ func (mgr *segmentManager) Put(ctx context.Context, segmentType SegmentType, seg
}
func (mgr *segmentManager) UpdateBy(action SegmentAction, filters ...SegmentFilter) int {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
updated := 0
mgr.rangeWithFilter(func(_ int64, _ SegmentType, segment Segment) bool {
if action(segment) {
@ -436,9 +432,6 @@ func (mgr *segmentManager) UpdateBy(action SegmentAction, filters ...SegmentFilt
// Deprecated:
// TODO: All Segment assigned to querynode should be managed by SegmentManager, including loading or releasing to perform a transaction.
func (mgr *segmentManager) Exist(segmentID typeutil.UniqueID, typ SegmentType) bool {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
_, ok := mgr.globalSegments.GetWithType(segmentID, typ)
if ok {
return true
@ -458,25 +451,16 @@ func (mgr *segmentManager) Exist(segmentID typeutil.UniqueID, typ SegmentType) b
}
func (mgr *segmentManager) Get(segmentID typeutil.UniqueID) Segment {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
segment, _ := mgr.globalSegments.Get(segmentID)
return segment
}
func (mgr *segmentManager) GetWithType(segmentID typeutil.UniqueID, typ SegmentType) Segment {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
segment, _ := mgr.globalSegments.GetWithType(segmentID, typ)
return segment
}
func (mgr *segmentManager) GetBy(filters ...SegmentFilter) []Segment {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
var ret []Segment
mgr.rangeWithFilter(func(id int64, _ SegmentType, segment Segment) bool {
ret = append(ret, segment)
@ -486,9 +470,6 @@ func (mgr *segmentManager) GetBy(filters ...SegmentFilter) []Segment {
}
func (mgr *segmentManager) GetAndPinBy(filters ...SegmentFilter) ([]Segment, error) {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
var ret []Segment
var err error
defer func() {
@ -516,9 +497,6 @@ func (mgr *segmentManager) GetAndPinBy(filters ...SegmentFilter) ([]Segment, err
}
func (mgr *segmentManager) GetAndPin(segments []int64, filters ...SegmentFilter) ([]Segment, error) {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
lockedSegments := make([]Segment, 0, len(segments))
var err error
defer func() {
@ -576,7 +554,7 @@ func (mgr *segmentManager) rangeWithFilter(process func(id int64, segType Segmen
target := mgr.globalSegments
var ok bool
if !criterion.channel.IsZero() {
target, ok = mgr.secondaryIndex.shardSegments[criterion.channel]
target, ok = mgr.secondaryIndex.shardSegments.Get(criterion.channel)
if !ok {
return
}
@ -603,17 +581,12 @@ func (mgr *segmentManager) GetGrowing(segmentID typeutil.UniqueID) Segment {
}
func (mgr *segmentManager) Empty() bool {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
return len(mgr.globalSegments.growingSegments)+len(mgr.globalSegments.sealedSegments) == 0
return mgr.globalSegments.growingSegments.Len()+mgr.globalSegments.sealedSegments.Len() == 0
}
// returns true if the segment exists,
// false otherwise
func (mgr *segmentManager) Remove(ctx context.Context, segmentID typeutil.UniqueID, scope querypb.DataScope) (int, int) {
mgr.mu.Lock()
var removeGrowing, removeSealed int
var growing, sealed Segment
switch scope {
@ -640,7 +613,6 @@ func (mgr *segmentManager) Remove(ctx context.Context, segmentID typeutil.Unique
removeSealed = 1
}
}
mgr.mu.Unlock()
if growing != nil {
mgr.release(ctx, growing)
@ -672,8 +644,6 @@ func (mgr *segmentManager) removeSegmentWithType(typ SegmentType, segmentID type
}
func (mgr *segmentManager) RemoveBy(ctx context.Context, filters ...SegmentFilter) (int, int) {
mgr.mu.Lock()
var removeSegments []Segment
var removeGrowing, removeSealed int
@ -690,8 +660,6 @@ func (mgr *segmentManager) RemoveBy(ctx context.Context, filters ...SegmentFilte
}
return true
}, filters...)
mgr.mu.Unlock()
for _, s := range removeSegments {
mgr.release(ctx, s)
}
@ -699,27 +667,28 @@ func (mgr *segmentManager) RemoveBy(ctx context.Context, filters ...SegmentFilte
}
func (mgr *segmentManager) Clear(ctx context.Context) {
mgr.mu.Lock()
for id := range mgr.globalSegments.growingSegments {
mgr.globalSegments.growingSegments.Range(func(id typeutil.UniqueID, _ Segment) bool {
mgr.growingOnReleasingSegments.Insert(id)
}
return true
})
growingWaitForRelease := mgr.globalSegments.growingSegments
for id := range mgr.globalSegments.sealedSegments {
mgr.globalSegments.sealedSegments.Range(func(id typeutil.UniqueID, _ Segment) bool {
mgr.sealedOnReleasingSegments.Insert(id)
}
return true
})
sealedWaitForRelease := mgr.globalSegments.sealedSegments
mgr.globalSegments = newSegments()
mgr.secondaryIndex = newSecondarySegmentIndex()
mgr.mu.Unlock()
for _, segment := range growingWaitForRelease {
growingWaitForRelease.Range(func(_ typeutil.UniqueID, segment Segment) bool {
mgr.release(ctx, segment)
}
for _, segment := range sealedWaitForRelease {
return true
})
sealedWaitForRelease.Range(func(_ typeutil.UniqueID, segment Segment) bool {
mgr.release(ctx, segment)
}
return true
})
}
// registerReleaseCallback registers the callback function when a segment is released.
@ -744,9 +713,6 @@ func (mgr *segmentManager) release(ctx context.Context, segment Segment) {
segment.Level().String(),
).Dec()
mgr.mu.Lock()
defer mgr.mu.Unlock()
switch segment.Type() {
case SegmentTypeGrowing:
mgr.growingOnReleasingSegments.Remove(segment.ID())