enhance: add metrics for load segment progress (#31005)

This PR add metrics for load segment progress:
1. add metrics for load segment/index concurrency
2. add metrics for load index latency
3. change load segment latency's time unit to ms

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/31071/head
wei liu 2024-03-08 15:39:02 +08:00 committed by GitHub
parent 122981aeb9
commit 5016031dc7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 43 additions and 3 deletions

View File

@ -194,6 +194,8 @@ func (loader *segmentLoaderV2) Load(ctx context.Context,
segmentID := loadInfo.SegmentID
segment, _ := newSegments.Get(segmentID)
metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadSegment").Inc()
defer metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadSegment").Dec()
tr := timerecord.NewTimeRecorder("loadDurationPerSegment")
var err error
@ -216,7 +218,7 @@ func (loader *segmentLoaderV2) Load(ctx context.Context,
log.Info("load segment done", zap.Int64("segmentID", segmentID))
loader.notifyLoadFinish(loadInfo)
metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(tr.ElapseSpan().Seconds())
metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds()))
return nil
}
@ -415,9 +417,11 @@ func (loader *segmentLoaderV2) loadSegment(ctx context.Context,
if err != nil {
return err
}
tr := timerecord.NewTimeRecorder("segmentLoader.LoadIndex")
if err := loader.loadFieldsIndex(ctx, schemaHelper, segment, loadInfo.GetNumOfRows(), indexedFieldInfos); err != nil {
return err
}
metrics.QueryNodeLoadIndexLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds()))
if err := loader.loadSealedSegmentFields(ctx, segment, fieldsMap, loadInfo.GetNumOfRows()); err != nil {
return err
@ -632,6 +636,9 @@ func (loader *segmentLoader) Load(ctx context.Context,
segmentID := loadInfo.SegmentID
segment, _ := newSegments.Get(segmentID)
metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadSegment").Inc()
defer metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadSegment").Dec()
tr := timerecord.NewTimeRecorder("loadDurationPerSegment")
var err error
@ -654,7 +661,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
log.Info("load segment done", zap.Int64("segmentID", segmentID))
loader.notifyLoadFinish(loadInfo)
metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(tr.ElapseSpan().Seconds())
metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds()))
return nil
}
@ -663,6 +670,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
log.Info("start to load segments in parallel",
zap.Int("segmentNum", len(infos)),
zap.Int("concurrencyLevel", concurrencyLevel))
err = funcutil.ProcessFuncParallel(len(infos),
concurrencyLevel, loadSegmentFunc, "loadSegmentFunc")
if err != nil {
@ -970,12 +978,15 @@ func (loader *segmentLoader) loadSegment(ctx context.Context,
return err
}
tr := timerecord.NewTimeRecorder("segmentLoader.LoadIndex")
log.Info("load fields...",
zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)),
)
if err := loader.loadFieldsIndex(ctx, schemaHelper, segment, loadInfo.GetNumOfRows(), indexedFieldInfos); err != nil {
return err
}
metrics.QueryNodeLoadIndexLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds()))
for fieldID, info := range indexedFieldInfos {
field, err := schemaHelper.GetFieldFromID(fieldID)
if err != nil {
@ -1508,7 +1519,11 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegmen
defer loader.freeRequest(resource)
log.Info("segment loader start to load index", zap.Int("segmentNumAfterFilter", len(infos)))
metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadIndex").Inc()
defer metrics.QueryNodeLoadSegmentConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), "LoadIndex").Dec()
tr := timerecord.NewTimeRecorder("segmentLoader.LoadIndex")
defer metrics.QueryNodeLoadIndexLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds()))
for _, loadInfo := range infos {
fieldIDs := typeutil.NewSet(lo.Map(loadInfo.GetIndexInfos(), func(info *querypb.FieldIndexInfo, _ int) int64 { return info.GetFieldID() })...)
fieldInfos := lo.SliceToMap(lo.Filter(loadInfo.GetBinlogPaths(), func(info *datapb.FieldBinlog, _ int) bool { return fieldIDs.Contain(info.GetFieldID()) }),

View File

@ -96,6 +96,7 @@ const (
lockSource = "lock_source"
lockType = "lock_type"
lockOp = "lock_op"
loadTypeName = "load_type"
)
var (

View File

@ -235,7 +235,7 @@ var (
Subsystem: typeutil.QueryNodeRole,
Name: "load_segment_latency",
Help: "latency of load per segment",
Buckets: []float64{0.1, 0.5, 1, 5, 10, 20, 50, 100, 300, 600, 1200}, // unit seconds
Buckets: longTaskBuckets, // unit milliseconds
}, []string{
nodeIDLabelName,
})
@ -479,6 +479,28 @@ var (
Name: "stopping_balance_segment_num",
Help: "the number of segment which executing stopping balance",
}, []string{nodeIDLabelName})
QueryNodeLoadSegmentConcurrency = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "load_segment_concurrency",
Help: "number of concurrent loading segments in QueryNode",
}, []string{
nodeIDLabelName,
loadTypeName,
})
QueryNodeLoadIndexLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "load_index_latency",
Help: "latency of load per segment's index, in milliseconds",
Buckets: longTaskBuckets, // unit milliseconds
}, []string{
nodeIDLabelName,
})
)
// RegisterQueryNode registers QueryNode metrics
@ -524,6 +546,8 @@ func RegisterQueryNode(registry *prometheus.Registry) {
registry.MustRegister(StoppingBalanceNodeNum)
registry.MustRegister(StoppingBalanceChannelNum)
registry.MustRegister(StoppingBalanceSegmentNum)
registry.MustRegister(QueryNodeLoadSegmentConcurrency)
registry.MustRegister(QueryNodeLoadIndexLatency)
}
func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) {