enhance: [10kcp] Refine querynode collection number metrics (#38352)

Related to #37630

Previously the loaded collection metrics was calculated via scanning all
loaded segment in segment manager, which is slow and buggy
implementation.

This PR:

- Move collection num metrics to collection manager
- Remove deprecated loaded partition metrics update logic

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/38398/head
congqixia 2024-12-10 21:06:42 +08:00 committed by GitHub
parent 4a2a5f0183
commit 5521091dcd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 7 additions and 24 deletions

View File

@ -25,6 +25,7 @@ package segments
import "C"
import (
"fmt"
"sync"
"unsafe"
@ -108,6 +109,11 @@ func (m *collectionManager) PutOrRef(collectionID int64, schema *schemapb.Collec
collection := NewCollection(collectionID, schema, meta, loadMeta)
collection.Ref(1)
m.collections[collectionID] = collection
m.updateMetric()
}
func (m *collectionManager) updateMetric() {
metrics.QueryNodeNumCollections.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(len(m.collections)))
}
func (m *collectionManager) Ref(collectionID int64, count uint32) bool {
@ -134,6 +140,7 @@ func (m *collectionManager) Unref(collectionID int64, count uint32) bool {
DeleteCollection(collection)
metrics.CleanupQueryNodeCollectionMetrics(paramtable.GetNodeID(), collectionID)
m.updateMetric()
return true
}
return false

View File

@ -36,7 +36,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/segments/metricsutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/eventlog"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
@ -409,7 +408,6 @@ func (mgr *segmentManager) Put(ctx context.Context, segmentType SegmentType, seg
segment.Level().String(),
).Inc()
}
mgr.updateMetric()
// release replaced segment
if len(replacedSegment) > 0 {
@ -642,7 +640,6 @@ func (mgr *segmentManager) Remove(ctx context.Context, segmentID typeutil.Unique
removeSealed = 1
}
}
mgr.updateMetric()
mgr.mu.Unlock()
if growing != nil {
@ -693,7 +690,6 @@ func (mgr *segmentManager) RemoveBy(ctx context.Context, filters ...SegmentFilte
}
return true
}, filters...)
mgr.updateMetric()
mgr.mu.Unlock()
for _, s := range removeSegments {
@ -716,7 +712,6 @@ func (mgr *segmentManager) Clear(ctx context.Context) {
sealedWaitForRelease := mgr.globalSegments.sealedSegments
mgr.globalSegments = newSegments()
mgr.secondaryIndex = newSecondarySegmentIndex()
mgr.updateMetric()
mgr.mu.Unlock()
for _, segment := range growingWaitForRelease {
@ -733,25 +728,6 @@ func (mgr *segmentManager) registerReleaseCallback(callback func(s Segment)) {
mgr.releaseCallback = callback
}
func (mgr *segmentManager) updateMetric() {
// update collection and partiation metric
collections, partitions := make(typeutil.Set[int64]), make(typeutil.Set[int64])
for _, seg := range mgr.globalSegments.growingSegments {
collections.Insert(seg.Collection())
if seg.Partition() != common.AllPartitionsID {
partitions.Insert(seg.Partition())
}
}
for _, seg := range mgr.globalSegments.sealedSegments {
collections.Insert(seg.Collection())
if seg.Partition() != common.AllPartitionsID {
partitions.Insert(seg.Partition())
}
}
metrics.QueryNodeNumCollections.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(collections.Len()))
metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(partitions.Len()))
}
func (mgr *segmentManager) release(ctx context.Context, segment Segment) {
if mgr.releaseCallback != nil {
mgr.releaseCallback(segment)