Add Prometheus metrics for QueryCoord (#15606)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>

Co-authored-by: jingkl <34296482+jingkl@users.noreply.github.com>
pull/15649/head
bigsheeper 2022-02-28 16:51:55 +08:00 committed by GitHub
parent cc0fa99410
commit e1bfd14db1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 243 additions and 6 deletions

View File

@ -18,7 +18,6 @@ package metrics
import (
"net/http"
// nolint:gosec
_ "net/http/pprof"
@ -587,11 +586,6 @@ func RegisterProxy() {
prometheus.MustRegister(ProxyDmlChannelTimeTick)
}
//RegisterQueryCoord registers QueryCoord metrics
func RegisterQueryCoord() {
}
//RegisterQueryNode registers QueryNode metrics
func RegisterQueryNode() {

View File

@ -0,0 +1,143 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const (
// TODO: move to metrics.go
queryCoordStatusLabel = "status"
QueryCoordMetricLabelSuccess = "success"
QueryCoordMetricLabelFail = "fail"
QueryCoordMetricLabelTotal = "total"
// TODO: move to metrics.go
collectionIDLabel = "collection_id"
)
// queryCoordLoadBuckets involves durations in milliseconds,
// [10 20 40 80 160 320 640 1280 2560 5120 10240 20480 40960 81920 163840 327680 655360 1.31072e+06]
var queryCoordLoadBuckets = prometheus.ExponentialBuckets(10, 2, 18)
var (
QueryCoordNumCollections = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "num_collections",
Help: "Number of collections in QueryCoord.",
}, []string{})
QueryCoordNumEntities = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "num_entities",
Help: "Number of entities in collection.",
}, []string{
collectionIDLabel,
})
QueryCoordLoadCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "load_count",
Help: "Load request statistic in QueryCoord.",
}, []string{
queryCoordStatusLabel,
})
QueryCoordReleaseCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "release_count",
Help: "Release request statistic in QueryCoord.",
}, []string{
queryCoordStatusLabel,
})
QueryCoordLoadLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "load_latency",
Help: "Load request latency in QueryCoord",
Buckets: queryCoordLoadBuckets,
}, []string{})
QueryCoordReleaseLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "release_latency",
Help: "Release request latency in QueryCoord",
Buckets: []float64{0, 5, 10, 20, 40, 100, 200, 400, 1000, 10000},
}, []string{})
QueryCoordNumChildTasks = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "num_child_tasks",
Help: "Number of child tasks in QueryCoord.",
}, []string{})
QueryCoordNumParentTasks = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "num_parent_tasks",
Help: "Number of parent tasks in QueryCoord.",
}, []string{})
QueryCoordChildTaskLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "child_task_latency",
Help: "Child tasks latency in QueryCoord.",
Buckets: queryCoordLoadBuckets,
}, []string{})
QueryCoordNumQueryNodes = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryCoordRole,
Name: "num_querynodes",
Help: "Number of QueryNodes in QueryCoord.",
}, []string{})
)
//RegisterQueryCoord registers QueryCoord metrics
func RegisterQueryCoord() {
prometheus.MustRegister(QueryCoordNumCollections)
prometheus.MustRegister(QueryCoordNumEntities)
prometheus.MustRegister(QueryCoordLoadCount)
prometheus.MustRegister(QueryCoordReleaseCount)
prometheus.MustRegister(QueryCoordLoadLatency)
prometheus.MustRegister(QueryCoordReleaseLatency)
prometheus.MustRegister(QueryCoordNumChildTasks)
prometheus.MustRegister(QueryCoordNumParentTasks)
prometheus.MustRegister(QueryCoordChildTaskLatency)
prometheus.MustRegister(QueryCoordNumQueryNodes)
}

View File

@ -29,6 +29,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
@ -573,6 +574,7 @@ func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionuti
go node.start()
}
c.nodes[id] = node
metrics.QueryCoordNumQueryNodes.WithLabelValues().Inc()
log.Debug("registerNode: create a new QueryNode", zap.Int64("nodeID", id), zap.String("address", session.Address), zap.Any("state", state))
return nil
}
@ -605,6 +607,7 @@ func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error {
}
delete(c.nodes, nodeID)
metrics.QueryCoordNumQueryNodes.WithLabelValues().Dec()
log.Debug("removeNodeInfo: delete nodeInfo in cluster MetaReplica", zap.Int64("nodeID", nodeID))
return nil

View File

@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
@ -158,6 +159,8 @@ func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowColl
// LoadCollection loads all the sealed segments of this collection to queryNodes, and assigns watchDmChannelRequest to queryNodes
func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelTotal).Inc()
collectionID := req.CollectionID
//schema := req.Schema
log.Debug("loadCollectionRequest received",
@ -173,6 +176,8 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Error("load collection failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc()
return status, nil
}
@ -183,6 +188,8 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID))
metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc()
return status, nil
}
// if some partitions of the collection have been loaded by load partitions request, return error
@ -198,6 +205,8 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
zap.Int64s("loaded partitionIDs", collectionInfo.PartitionIDs),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc()
return status, nil
}
}
@ -219,6 +228,8 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
zap.Error(err))
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc()
return status, nil
}
@ -231,6 +242,8 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
zap.Error(err))
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc()
return status, nil
}
@ -244,6 +257,7 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
// ReleaseCollection clears all data related to this collecion on the querynode
func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelTotal).Inc()
//dbID := req.DbID
collectionID := req.CollectionID
log.Debug("releaseCollectionRequest received",
@ -259,6 +273,8 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Error("release collection failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc()
return status, nil
}
@ -269,6 +285,8 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc()
return status, nil
}
@ -289,6 +307,8 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
zap.Error(err))
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc()
return status, nil
}
@ -301,6 +321,8 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
zap.Error(err))
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc()
return status, nil
}
@ -310,6 +332,9 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
zap.Int64("msgID", req.Base.MsgID))
//qc.MetaReplica.printMeta()
//qc.cluster.printMeta()
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc()
metrics.QueryCoordReleaseLatency.WithLabelValues().Observe(float64(releaseCollectionTask.elapseSpan().Milliseconds()))
return status, nil
}
@ -404,6 +429,7 @@ func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowParti
// LoadPartitions loads all the sealed segments of this partition to queryNodes, and assigns watchDmChannelRequest to queryNodes
func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelTotal).Inc()
collectionID := req.CollectionID
partitionIDs := req.PartitionIDs
@ -421,6 +447,8 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Error("load partition failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc()
return status, nil
}
@ -436,6 +464,7 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc()
return status, nil
}
@ -475,6 +504,8 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc()
return status, nil
}
@ -483,6 +514,8 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID))
metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc()
return status, nil
}
@ -504,6 +537,8 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
zap.Error(err))
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc()
return status, nil
}
@ -517,6 +552,8 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc()
return status, nil
}
@ -531,6 +568,8 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
// ReleasePartitions clears all data related to this partition on the querynode
func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelTotal).Inc()
//dbID := req.DbID
collectionID := req.CollectionID
partitionIDs := req.PartitionIDs
@ -548,6 +587,8 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Error("release partition failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc()
return status, nil
}
@ -561,6 +602,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc()
return status, nil
}
@ -579,6 +621,8 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc()
return status, nil
}
@ -599,6 +643,8 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64("msgID", req.Base.MsgID))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc()
return status, nil
}
@ -608,6 +654,8 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64("msgID", req.Base.MsgID))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc()
return status, nil
}
@ -650,6 +698,8 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
zap.Error(err))
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc()
return status, nil
}
@ -663,6 +713,8 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
zap.Error(err))
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelFail).Inc()
return status, nil
}
@ -674,6 +726,9 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
//qc.MetaReplica.printMeta()
//qc.cluster.printMeta()
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc()
metrics.QueryCoordReleaseLatency.WithLabelValues().Observe(float64(releaseTask.elapseSpan().Milliseconds()))
return status, nil
}

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -165,6 +166,7 @@ func (m *MetaReplica) reloadFromKV() error {
}
m.collectionInfos[collectionID] = collectionInfo
}
metrics.QueryCoordNumCollections.WithLabelValues().Set(float64(len(m.collectionInfos)))
if err := m.segmentsInfo.loadSegments(); err != nil {
return err
@ -299,6 +301,7 @@ func (m *MetaReplica) addCollection(collectionID UniqueID, loadType querypb.Load
}
m.collectionMu.Lock()
m.collectionInfos[collectionID] = newCollection
metrics.QueryCoordNumCollections.WithLabelValues().Set(float64(len(m.collectionInfos)))
m.collectionMu.Unlock()
}
@ -362,6 +365,7 @@ func (m *MetaReplica) releaseCollection(collectionID UniqueID) error {
m.collectionMu.Lock()
delete(m.collectionInfos, collectionID)
metrics.QueryCoordNumCollections.WithLabelValues().Set(float64(len(m.collectionInfos)))
m.collectionMu.Unlock()
m.deltaChannelMu.Lock()

View File

@ -22,6 +22,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util"
)
@ -57,6 +58,7 @@ func (s *segmentsInfo) loadSegments() error {
return
}
s.segmentIDMap[segment.GetSegmentID()] = segment
metrics.QueryCoordNumEntities.WithLabelValues(fmt.Sprint(segment.CollectionID)).Add(float64(segment.NumRows))
}
})
return err
@ -75,6 +77,7 @@ func (s *segmentsInfo) saveSegment(segment *querypb.SegmentInfo) error {
return err
}
s.segmentIDMap[segment.GetSegmentID()] = segment
metrics.QueryCoordNumEntities.WithLabelValues(fmt.Sprint(segment.CollectionID)).Add(float64(segment.NumRows))
return nil
}
@ -86,6 +89,7 @@ func (s *segmentsInfo) removeSegment(segment *querypb.SegmentInfo) error {
return err
}
delete(s.segmentIDMap, segment.GetSegmentID())
metrics.QueryCoordNumEntities.WithLabelValues(fmt.Sprint(segment.CollectionID)).Sub(float64(segment.NumRows))
return nil
}

View File

@ -27,10 +27,12 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/util/timerecord"
)
const timeoutForRPC = 10 * time.Second
@ -91,6 +93,7 @@ type task interface {
setResultInfo(err error)
getResultInfo() *commonpb.Status
updateTaskProcess()
elapseSpan() time.Duration
}
type baseTask struct {
@ -111,6 +114,8 @@ type baseTask struct {
parentTask task
childTasks []task
childTasksMu sync.RWMutex
timeRecorder *timerecord.TimeRecorder
}
func newBaseTask(ctx context.Context, triggerType querypb.TriggerCondition) *baseTask {
@ -125,6 +130,7 @@ func newBaseTask(ctx context.Context, triggerType querypb.TriggerCondition) *bas
retryCount: MaxRetryNum,
triggerCondition: triggerType,
childTasks: []task{},
timeRecorder: timerecord.NewTimeRecorder("QueryCoordBaseTask"),
}
return baseTask
@ -197,6 +203,7 @@ func (bt *baseTask) removeChildTaskByID(taskID UniqueID) {
}
}
bt.childTasks = result
metrics.QueryCoordNumChildTasks.WithLabelValues().Dec()
}
func (bt *baseTask) clearChildTasks() {
@ -272,12 +279,17 @@ func (bt *baseTask) rollBack(ctx context.Context) []task {
return nil
}
func (bt *baseTask) elapseSpan() time.Duration {
return bt.timeRecorder.ElapseSpan()
}
type loadCollectionTask struct {
*baseTask
*querypb.LoadCollectionRequest
broker *globalMetaBroker
cluster Cluster
meta Meta
once sync.Once
}
func (lct *loadCollectionTask) msgBase() *commonpb.MsgBase {
@ -331,6 +343,11 @@ func (lct *loadCollectionTask) updateTaskProcess() {
log.Error("loadCollectionTask: set load percentage to meta's collectionInfo", zap.Int64("collectionID", collectionID))
lct.setResultInfo(err)
}
lct.once.Do(func() {
metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc()
metrics.QueryCoordLoadLatency.WithLabelValues().Observe(float64(lct.elapseSpan().Milliseconds()))
metrics.QueryCoordNumChildTasks.WithLabelValues().Sub(float64(len(lct.getChildTask())))
})
}
}
@ -430,6 +447,7 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error {
lct.addChildTask(internalTask)
log.Debug("loadCollectionTask: add a childTask", zap.Int64("collectionID", collectionID), zap.Int32("task type", int32(internalTask.msgType())), zap.Int64("msgID", lct.Base.MsgID))
}
metrics.QueryCoordNumChildTasks.WithLabelValues().Add(float64(len(internalTasks)))
log.Debug("loadCollectionTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lct.Base.MsgID))
err = lct.meta.addCollection(collectionID, querypb.LoadType_loadCollection, lct.Schema)
@ -624,6 +642,7 @@ type loadPartitionTask struct {
cluster Cluster
meta Meta
addCol bool
once sync.Once
}
func (lpt *loadPartitionTask) msgBase() *commonpb.MsgBase {
@ -678,6 +697,11 @@ func (lpt *loadPartitionTask) updateTaskProcess() {
lpt.setResultInfo(err)
}
}
lpt.once.Do(func() {
metrics.QueryCoordLoadCount.WithLabelValues(metrics.QueryCoordMetricLabelSuccess).Inc()
metrics.QueryCoordLoadLatency.WithLabelValues().Observe(float64(lpt.elapseSpan().Milliseconds()))
metrics.QueryCoordNumChildTasks.WithLabelValues().Sub(float64(len(lpt.getChildTask())))
})
}
}
@ -765,6 +789,7 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error {
lpt.addChildTask(internalTask)
log.Debug("loadPartitionTask: add a childTask", zap.Int64("collectionID", collectionID), zap.Int32("task type", int32(internalTask.msgType())))
}
metrics.QueryCoordNumChildTasks.WithLabelValues().Add(float64(len(internalTasks)))
log.Debug("loadPartitionTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Int64("msgID", lpt.Base.MsgID))
err = lpt.meta.addCollection(collectionID, querypb.LoadType_LoadPartition, lpt.Schema)

View File

@ -31,6 +31,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/retry"
@ -70,6 +71,7 @@ func (queue *taskQueue) addTask(t task) {
if queue.tasks.Len() == 0 {
queue.taskChan <- 1
queue.tasks.PushBack(t)
metrics.QueryCoordNumParentTasks.WithLabelValues().Inc()
return
}
@ -87,6 +89,8 @@ func (queue *taskQueue) addTask(t task) {
queue.tasks.InsertAfter(t, e)
break
}
metrics.QueryCoordNumParentTasks.WithLabelValues().Inc()
}
func (queue *taskQueue) addTaskToFront(t task) {
@ -96,6 +100,8 @@ func (queue *taskQueue) addTaskToFront(t task) {
} else {
queue.tasks.PushFront(t)
}
metrics.QueryCoordNumParentTasks.WithLabelValues().Inc()
}
// PopTask pops a trigger task from task list
@ -111,6 +117,8 @@ func (queue *taskQueue) popTask() task {
ft := queue.tasks.Front()
queue.tasks.Remove(ft)
metrics.QueryCoordNumParentTasks.WithLabelValues().Dec()
return ft.Value.(task)
}
@ -823,6 +831,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task,
log.Debug("waitActivateTaskDone: one activate task done",
zap.Int64("taskID", t.getTaskID()),
zap.Int64("triggerTaskID", triggerTask.getTaskID()))
metrics.QueryCoordChildTaskLatency.WithLabelValues().Observe(float64(t.elapseSpan().Milliseconds()))
}
}