From c945efa853c235429242c118192a4a9c31ea3d3e Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Thu, 16 Jan 2025 13:27:02 +0800 Subject: [PATCH] enhance: [2.5] Remove mutex from SegmentManger in querynode (#39051) (#39282) Remove mutex from SegmentManger in querynode to prevent mutex contention. issue: https://github.com/milvus-io/milvus/issues/37630 pr: https://github.com/milvus-io/milvus/pull/39051 Signed-off-by: bigsheeper --- internal/querynodev2/segments/manager.go | 144 +++++++++-------------- 1 file changed, 55 insertions(+), 89 deletions(-) diff --git a/internal/querynodev2/segments/manager.go b/internal/querynodev2/segments/manager.go index 2c66302fb5..9dc066e04f 100644 --- a/internal/querynodev2/segments/manager.go +++ b/internal/querynodev2/segments/manager.go @@ -27,7 +27,6 @@ import "C" import ( "context" "fmt" - "sync" "go.uber.org/zap" "golang.org/x/sync/singleflight" @@ -40,6 +39,7 @@ import ( "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/proto/querypb" "github.com/milvus-io/milvus/pkg/util/cache" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -202,55 +202,59 @@ type SegmentManager interface { var _ SegmentManager = (*segmentManager)(nil) type secondarySegmentIndex struct { - shardSegments map[metautil.Channel]segments + keyLock *lock.KeyLock[metautil.Channel] + shardSegments *typeutil.ConcurrentMap[metautil.Channel, segments] } func newSecondarySegmentIndex() secondarySegmentIndex { return secondarySegmentIndex{ - shardSegments: make(map[metautil.Channel]segments), + keyLock: lock.NewKeyLock[metautil.Channel](), + shardSegments: typeutil.NewConcurrentMap[metautil.Channel, segments](), } } func (si secondarySegmentIndex) Put(ctx context.Context, segmentType SegmentType, segment Segment) { shard := segment.Shard() - segments, ok := si.shardSegments[shard] - if !ok { - segments = newSegments() - si.shardSegments[shard] = segments - } + si.keyLock.Lock(shard) + defer si.keyLock.Unlock(shard) + + segments, _ := si.shardSegments.GetOrInsert(shard, newSegments()) segments.Put(ctx, segmentType, segment) } func (si secondarySegmentIndex) Remove(s Segment) { shard := s.Shard() - segments, ok := si.shardSegments[shard] + si.keyLock.Lock(shard) + defer si.keyLock.Unlock(shard) + + segments, ok := si.shardSegments.Get(shard) if !ok { return } segments.Remove(s) if segments.Empty() { - delete(si.shardSegments, shard) + si.shardSegments.Remove(shard) } } type segments struct { - growingSegments map[typeutil.UniqueID]Segment - sealedSegments map[typeutil.UniqueID]Segment + growingSegments *typeutil.ConcurrentMap[typeutil.UniqueID, Segment] + sealedSegments *typeutil.ConcurrentMap[typeutil.UniqueID, Segment] } func (segments segments) Put(_ context.Context, segmentType SegmentType, segment Segment) { switch segmentType { case SegmentTypeGrowing: - segments.growingSegments[segment.ID()] = segment + segments.growingSegments.Insert(segment.ID(), segment) case SegmentTypeSealed: - segments.sealedSegments[segment.ID()] = segment + segments.sealedSegments.Insert(segment.ID(), segment) } } func (segments segments) Get(segmentID int64) (Segment, bool) { - if segment, ok := segments.growingSegments[segmentID]; ok { + if segment, ok := segments.growingSegments.Get(segmentID); ok { return segment, true - } else if segment, ok = segments.sealedSegments[segmentID]; ok { + } else if segment, ok = segments.sealedSegments.Get(segmentID); ok { return segment, true } @@ -263,9 +267,9 @@ func (segments segments) GetWithType(segmentID int64, segmentType SegmentType) ( var ok bool switch segmentType { case SegmentTypeGrowing: - segment, ok = segments.growingSegments[segmentID] + segment, ok = segments.growingSegments.Get(segmentID) case SegmentTypeSealed: - segment, ok = segments.sealedSegments[segmentID] + segment, ok = segments.sealedSegments.Get(segmentID) } return segment, ok } @@ -275,11 +279,9 @@ func (segments segments) RemoveWithType(segmentID int64, segmentType SegmentType var ok bool switch segmentType { case SegmentTypeGrowing: - segment, ok = segments.growingSegments[segmentID] - delete(segments.growingSegments, segmentID) + segment, ok = segments.growingSegments.GetAndRemove(segmentID) case SegmentTypeSealed: - segment, ok = segments.sealedSegments[segmentID] - delete(segments.sealedSegments, segmentID) + segment, ok = segments.sealedSegments.GetAndRemove(segmentID) } return segment, ok } @@ -287,9 +289,9 @@ func (segments segments) RemoveWithType(segmentID int64, segmentType SegmentType func (segments segments) Remove(segment Segment) { switch segment.Type() { case SegmentTypeGrowing: - delete(segments.growingSegments, segment.ID()) + segments.growingSegments.Remove(segment.ID()) case SegmentTypeSealed: - delete(segments.sealedSegments, segment.ID()) + segments.sealedSegments.Remove(segment.ID()) } } @@ -313,58 +315,57 @@ func (segments segments) RangeWithFilter(criterion *segmentCriterion, process fu return } - var candidates []map[int64]Segment + var candidates []*typeutil.ConcurrentMap[typeutil.UniqueID, Segment] switch criterion.segmentType { case SegmentTypeGrowing: - candidates = []map[int64]Segment{segments.growingSegments} + candidates = []*typeutil.ConcurrentMap[typeutil.UniqueID, Segment]{segments.growingSegments} case SegmentTypeSealed: - candidates = []map[int64]Segment{segments.sealedSegments} + candidates = []*typeutil.ConcurrentMap[typeutil.UniqueID, Segment]{segments.sealedSegments} default: - candidates = []map[int64]Segment{segments.growingSegments, segments.sealedSegments} + candidates = []*typeutil.ConcurrentMap[typeutil.UniqueID, Segment]{segments.growingSegments, segments.sealedSegments} } for _, candidate := range candidates { - for id, segment := range candidate { + candidate.Range(func(id typeutil.UniqueID, segment Segment) bool { if criterion.Match(segment) { if !process(id, segment.Type(), segment) { - return + return false } } - } + return true + }) } } func (segments segments) Empty() bool { - return len(segments.growingSegments) == 0 && len(segments.sealedSegments) == 0 + return segments.growingSegments.Len() == 0 && segments.sealedSegments.Len() == 0 } func newSegments() segments { return segments{ - growingSegments: make(map[int64]Segment), - sealedSegments: make(map[int64]Segment), + growingSegments: typeutil.NewConcurrentMap[typeutil.UniqueID, Segment](), + sealedSegments: typeutil.NewConcurrentMap[typeutil.UniqueID, Segment](), } } // Manager manages all collections and segments type segmentManager struct { - mu sync.RWMutex // guards all - globalSegments segments secondaryIndex secondarySegmentIndex // releaseCallback is the callback function when a segment is released. releaseCallback func(s Segment) - growingOnReleasingSegments typeutil.UniqueSet - sealedOnReleasingSegments typeutil.UniqueSet + growingOnReleasingSegments *typeutil.ConcurrentSet[int64] + sealedOnReleasingSegments *typeutil.ConcurrentSet[int64] } func NewSegmentManager() *segmentManager { return &segmentManager{ globalSegments: newSegments(), secondaryIndex: newSecondarySegmentIndex(), - growingOnReleasingSegments: typeutil.NewUniqueSet(), - sealedOnReleasingSegments: typeutil.NewUniqueSet(), + growingOnReleasingSegments: typeutil.NewConcurrentSet[int64](), + sealedOnReleasingSegments: typeutil.NewConcurrentSet[int64](), } } @@ -376,8 +377,6 @@ func (mgr *segmentManager) put(ctx context.Context, segmentType SegmentType, seg func (mgr *segmentManager) Put(ctx context.Context, segmentType SegmentType, segments ...Segment) { var replacedSegment []Segment - mgr.mu.Lock() - defer mgr.mu.Unlock() log := log.Ctx(ctx) for _, segment := range segments { @@ -420,9 +419,6 @@ func (mgr *segmentManager) Put(ctx context.Context, segmentType SegmentType, seg } func (mgr *segmentManager) UpdateBy(action SegmentAction, filters ...SegmentFilter) int { - mgr.mu.RLock() - defer mgr.mu.RUnlock() - updated := 0 mgr.rangeWithFilter(func(_ int64, _ SegmentType, segment Segment) bool { if action(segment) { @@ -436,9 +432,6 @@ func (mgr *segmentManager) UpdateBy(action SegmentAction, filters ...SegmentFilt // Deprecated: // TODO: All Segment assigned to querynode should be managed by SegmentManager, including loading or releasing to perform a transaction. func (mgr *segmentManager) Exist(segmentID typeutil.UniqueID, typ SegmentType) bool { - mgr.mu.RLock() - defer mgr.mu.RUnlock() - _, ok := mgr.globalSegments.GetWithType(segmentID, typ) if ok { return true @@ -458,25 +451,16 @@ func (mgr *segmentManager) Exist(segmentID typeutil.UniqueID, typ SegmentType) b } func (mgr *segmentManager) Get(segmentID typeutil.UniqueID) Segment { - mgr.mu.RLock() - defer mgr.mu.RUnlock() - segment, _ := mgr.globalSegments.Get(segmentID) return segment } func (mgr *segmentManager) GetWithType(segmentID typeutil.UniqueID, typ SegmentType) Segment { - mgr.mu.RLock() - defer mgr.mu.RUnlock() - segment, _ := mgr.globalSegments.GetWithType(segmentID, typ) return segment } func (mgr *segmentManager) GetBy(filters ...SegmentFilter) []Segment { - mgr.mu.RLock() - defer mgr.mu.RUnlock() - var ret []Segment mgr.rangeWithFilter(func(id int64, _ SegmentType, segment Segment) bool { ret = append(ret, segment) @@ -486,9 +470,6 @@ func (mgr *segmentManager) GetBy(filters ...SegmentFilter) []Segment { } func (mgr *segmentManager) GetAndPinBy(filters ...SegmentFilter) ([]Segment, error) { - mgr.mu.RLock() - defer mgr.mu.RUnlock() - var ret []Segment var err error defer func() { @@ -516,9 +497,6 @@ func (mgr *segmentManager) GetAndPinBy(filters ...SegmentFilter) ([]Segment, err } func (mgr *segmentManager) GetAndPin(segments []int64, filters ...SegmentFilter) ([]Segment, error) { - mgr.mu.RLock() - defer mgr.mu.RUnlock() - lockedSegments := make([]Segment, 0, len(segments)) var err error defer func() { @@ -576,7 +554,7 @@ func (mgr *segmentManager) rangeWithFilter(process func(id int64, segType Segmen target := mgr.globalSegments var ok bool if !criterion.channel.IsZero() { - target, ok = mgr.secondaryIndex.shardSegments[criterion.channel] + target, ok = mgr.secondaryIndex.shardSegments.Get(criterion.channel) if !ok { return } @@ -603,17 +581,12 @@ func (mgr *segmentManager) GetGrowing(segmentID typeutil.UniqueID) Segment { } func (mgr *segmentManager) Empty() bool { - mgr.mu.RLock() - defer mgr.mu.RUnlock() - - return len(mgr.globalSegments.growingSegments)+len(mgr.globalSegments.sealedSegments) == 0 + return mgr.globalSegments.growingSegments.Len()+mgr.globalSegments.sealedSegments.Len() == 0 } // returns true if the segment exists, // false otherwise func (mgr *segmentManager) Remove(ctx context.Context, segmentID typeutil.UniqueID, scope querypb.DataScope) (int, int) { - mgr.mu.Lock() - var removeGrowing, removeSealed int var growing, sealed Segment switch scope { @@ -640,7 +613,6 @@ func (mgr *segmentManager) Remove(ctx context.Context, segmentID typeutil.Unique removeSealed = 1 } } - mgr.mu.Unlock() if growing != nil { mgr.release(ctx, growing) @@ -672,8 +644,6 @@ func (mgr *segmentManager) removeSegmentWithType(typ SegmentType, segmentID type } func (mgr *segmentManager) RemoveBy(ctx context.Context, filters ...SegmentFilter) (int, int) { - mgr.mu.Lock() - var removeSegments []Segment var removeGrowing, removeSealed int @@ -690,8 +660,6 @@ func (mgr *segmentManager) RemoveBy(ctx context.Context, filters ...SegmentFilte } return true }, filters...) - mgr.mu.Unlock() - for _, s := range removeSegments { mgr.release(ctx, s) } @@ -699,27 +667,28 @@ func (mgr *segmentManager) RemoveBy(ctx context.Context, filters ...SegmentFilte } func (mgr *segmentManager) Clear(ctx context.Context) { - mgr.mu.Lock() - - for id := range mgr.globalSegments.growingSegments { + mgr.globalSegments.growingSegments.Range(func(id typeutil.UniqueID, _ Segment) bool { mgr.growingOnReleasingSegments.Insert(id) - } + return true + }) growingWaitForRelease := mgr.globalSegments.growingSegments - for id := range mgr.globalSegments.sealedSegments { + mgr.globalSegments.sealedSegments.Range(func(id typeutil.UniqueID, _ Segment) bool { mgr.sealedOnReleasingSegments.Insert(id) - } + return true + }) sealedWaitForRelease := mgr.globalSegments.sealedSegments mgr.globalSegments = newSegments() mgr.secondaryIndex = newSecondarySegmentIndex() - mgr.mu.Unlock() - for _, segment := range growingWaitForRelease { + growingWaitForRelease.Range(func(_ typeutil.UniqueID, segment Segment) bool { mgr.release(ctx, segment) - } - for _, segment := range sealedWaitForRelease { + return true + }) + sealedWaitForRelease.Range(func(_ typeutil.UniqueID, segment Segment) bool { mgr.release(ctx, segment) - } + return true + }) } // registerReleaseCallback registers the callback function when a segment is released. @@ -744,9 +713,6 @@ func (mgr *segmentManager) release(ctx context.Context, segment Segment) { segment.Level().String(), ).Dec() - mgr.mu.Lock() - defer mgr.mu.Unlock() - switch segment.Type() { case SegmentTypeGrowing: mgr.growingOnReleasingSegments.Remove(segment.ID())