From 0baeb609dc766af2494c36b1e5af4cc4cd1e42bf Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Thu, 30 Jun 2022 17:26:19 +0800 Subject: [PATCH] Fix bugs in monitoring metrics (#17894) Signed-off-by: zhenshan.cao --- internal/datacoord/meta.go | 14 +++++++------- internal/datacoord/server.go | 5 ++++- internal/datanode/flow_graph_delete_node.go | 2 +- internal/datanode/segment_replica.go | 8 ++++---- internal/metrics/datacoord_metrics.go | 1 + internal/metrics/proxy_metrics.go | 13 +------------ internal/metrics/querynode_metrics.go | 4 ++-- internal/metrics/rootcoord_metrics.go | 2 +- internal/querynode/search.go | 8 ++++++-- internal/rootcoord/dml_channels.go | 4 ++-- 10 files changed, 29 insertions(+), 32 deletions(-) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 397e28e616..d84f8f8741 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -85,7 +85,7 @@ func (m *meta) reloadFromKV() error { } state := segmentInfo.GetState() m.segments.SetSegment(segmentInfo.GetID(), NewSegmentInfo(segmentInfo)) - metrics.DataCoordNumSegments.WithLabelValues(string(state)).Inc() + metrics.DataCoordNumSegments.WithLabelValues(state.String()).Inc() if state == commonpb.SegmentState_Flushed { numStoredRows += segmentInfo.GetNumOfRows() } @@ -175,7 +175,7 @@ func (m *meta) AddSegment(segment *SegmentInfo) error { if err := m.saveSegmentInfo(segment); err != nil { return err } - metrics.DataCoordNumSegments.WithLabelValues(string(segment.GetState())).Inc() + metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String()).Inc() return nil } @@ -233,8 +233,8 @@ func (m *meta) SetState(segmentID UniqueID, state commonpb.SegmentState) error { if curSegInfo != nil && isSegmentHealthy(curSegInfo) { err := m.saveSegmentInfo(curSegInfo) if err == nil { - metrics.DataCoordNumSegments.WithLabelValues(string(oldState)).Dec() - metrics.DataCoordNumSegments.WithLabelValues(string(state)).Inc() + metrics.DataCoordNumSegments.WithLabelValues(oldState.String()).Dec() + metrics.DataCoordNumSegments.WithLabelValues(state.String()).Inc() if state == commonpb.SegmentState_Flushed { metrics.DataCoordNumStoredRows.WithLabelValues().Add(float64(curSegInfo.GetNumOfRows())) metrics.DataCoordNumStoredRowsCounter.WithLabelValues().Add(float64(curSegInfo.GetNumOfRows())) @@ -397,8 +397,8 @@ func (m *meta) UpdateFlushSegmentsInfo( } oldSegmentState := segment.GetState() newSegmentState := clonedSegment.GetState() - metrics.DataCoordNumSegments.WithLabelValues(string(oldSegmentState)).Dec() - metrics.DataCoordNumSegments.WithLabelValues(string(newSegmentState)).Inc() + metrics.DataCoordNumSegments.WithLabelValues(oldSegmentState.String()).Dec() + metrics.DataCoordNumSegments.WithLabelValues(newSegmentState.String()).Inc() if newSegmentState == commonpb.SegmentState_Flushed { metrics.DataCoordNumStoredRows.WithLabelValues().Add(float64(clonedSegment.GetNumOfRows())) metrics.DataCoordNumStoredRowsCounter.WithLabelValues().Add(float64(clonedSegment.GetNumOfRows())) @@ -448,7 +448,7 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI for _, seg := range originSegments { state := seg.GetState() metrics.DataCoordNumSegments.WithLabelValues( - string(state)).Dec() + state.String()).Dec() if state == commonpb.SegmentState_Flushed { metrics.DataCoordNumStoredRows.WithLabelValues().Sub(float64(seg.GetNumOfRows())) } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index c6b6ebcee0..058c5f7fcb 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -44,6 +44,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/logutil" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/paramtable" @@ -575,7 +576,9 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat } utcT, _ := tsoutil.ParseHybridTs(ts) - metrics.DataCoordSyncEpoch.WithLabelValues(ch).Set(float64(utcT)) + + pChannelName := funcutil.ToPhysicalChannel(ch) + metrics.DataCoordSyncEpoch.WithLabelValues(pChannelName).Set(float64(utcT)) s.updateSegmentStatistics(ttMsg.GetSegmentsStats()) diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index 5f5393fe5b..316691e340 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -159,7 +159,7 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er // store delDataBuf.updateSize(int64(rows)) - metrics.DataNodeConsumeMsgRowsCount.WithLabelValues(metrics.DeleteLabel, fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Add(float64(rows)) + metrics.DataNodeConsumeMsgRowsCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.DeleteLabel).Add(float64(rows)) delDataBuf.updateTimeRange(tr) dn.delBuf.Store(segID, delDataBuf) } diff --git a/internal/datanode/segment_replica.go b/internal/datanode/segment_replica.go index a4cd1ac356..a9f8ce7149 100644 --- a/internal/datanode/segment_replica.go +++ b/internal/datanode/segment_replica.go @@ -215,7 +215,7 @@ func (replica *SegmentReplica) new2FlushedSegment(segID UniqueID) { replica.flushedSegments[segID] = &seg delete(replica.newSegments, segID) - metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.NodeID)).Dec() + metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Dec() } // normal2FlushedSegment transfers a segment from *normal* to *flushed* by changing *isFlushed* @@ -287,7 +287,7 @@ func (replica *SegmentReplica) addNewSegment(segID, collID, partitionID UniqueID replica.segMu.Lock() defer replica.segMu.Unlock() replica.newSegments[segID] = seg - metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc() + metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Inc() return nil } @@ -381,7 +381,7 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu replica.segMu.Lock() replica.normalSegments[segID] = seg replica.segMu.Unlock() - metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc() + metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Inc() return nil } @@ -579,7 +579,7 @@ func (replica *SegmentReplica) removeSegments(segIDs ...UniqueID) { cnt++ } } - metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.NodeID)).Sub(float64(cnt)) + metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Sub(float64(cnt)) for _, segID := range segIDs { delete(replica.newSegments, segID) diff --git a/internal/metrics/datacoord_metrics.go b/internal/metrics/datacoord_metrics.go index 1e78969fae..026f93e20a 100644 --- a/internal/metrics/datacoord_metrics.go +++ b/internal/metrics/datacoord_metrics.go @@ -137,5 +137,6 @@ func RegisterDataCoord(registry *prometheus.Registry) { registry.MustRegister(DataCoordNumSegments) registry.MustRegister(DataCoordNumCollections) registry.MustRegister(DataCoordNumStoredRows) + registry.MustRegister(DataCoordNumStoredRowsCounter) registry.MustRegister(DataCoordSyncEpoch) } diff --git a/internal/metrics/proxy_metrics.go b/internal/metrics/proxy_metrics.go index 08325f4a1d..308573564f 100644 --- a/internal/metrics/proxy_metrics.go +++ b/internal/metrics/proxy_metrics.go @@ -45,21 +45,11 @@ var ( prometheus.HistogramOpts{ Namespace: milvusNamespace, Subsystem: typeutil.ProxyRole, - Name: "sq_lantency", + Name: "sq_latency", Help: "latency of search", Buckets: buckets, }, []string{nodeIDLabelName, queryTypeLabelName}) - // ProxySendSQReqLatency record the latency that the proxy sent the search request to the message stream. - ProxySendSQReqLatency = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: milvusNamespace, - Subsystem: typeutil.ProxyRole, - Name: "sq_send_latency", - Help: "latency that proxy sent the search request to the message stream", - Buckets: buckets, // unit: ms - }, []string{nodeIDLabelName, queryTypeLabelName}) - // ProxyWaitForSearchResultLatency record the time that the proxy waits for the search result. ProxyWaitForSearchResultLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -249,7 +239,6 @@ func RegisterProxy(registry *prometheus.Registry) { registry.MustRegister(ProxyInsertVectors) registry.MustRegister(ProxySearchLatency) - registry.MustRegister(ProxySendSQReqLatency) registry.MustRegister(ProxyWaitForSearchResultLatency) registry.MustRegister(ProxyReduceResultLatency) registry.MustRegister(ProxyDecodeResultLatency) diff --git a/internal/metrics/querynode_metrics.go b/internal/metrics/querynode_metrics.go index f2d3f8305f..15d76ca247 100644 --- a/internal/metrics/querynode_metrics.go +++ b/internal/metrics/querynode_metrics.go @@ -110,7 +110,7 @@ var ( prometheus.HistogramOpts{ Namespace: milvusNamespace, Subsystem: typeutil.QueryNodeRole, - Name: "sq_queue_lantency", + Name: "sq_queue_latency", Help: "latency of search or query in queue", Buckets: buckets, }, []string{ @@ -285,7 +285,7 @@ var ( prometheus.GaugeOpts{ Namespace: milvusNamespace, Subsystem: typeutil.QueryNodeRole, - Name: "entities_num", + Name: "entity_num", Help: "number of entities which can be searched/queried", }, []string{ nodeIDLabelName, diff --git a/internal/metrics/rootcoord_metrics.go b/internal/metrics/rootcoord_metrics.go index 293bdc1dc8..5d9aa2ce48 100644 --- a/internal/metrics/rootcoord_metrics.go +++ b/internal/metrics/rootcoord_metrics.go @@ -27,7 +27,7 @@ var ( Subsystem: typeutil.RootCoordRole, Name: "sync_epoch_time", Help: "synchronized unix epoch per physical channel", - }, []string{"PChannel"}) + }, []string{channelNameLabelName}) RootCoordDDLReqCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ diff --git a/internal/querynode/search.go b/internal/querynode/search.go index 72a3d24ca4..8e5c8bf165 100644 --- a/internal/querynode/search.go +++ b/internal/querynode/search.go @@ -22,6 +22,7 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" + "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/util/timerecord" ) @@ -31,6 +32,10 @@ func searchOnSegments(replica ReplicaInterface, segType segmentType, searchReq * // results variables searchResults := make([]*SearchResult, len(segIDs)) errs := make([]error, len(segIDs)) + searchLabel := metrics.SealedSegmentLabel + if segType == commonpb.SegmentState_Growing { + searchLabel = metrics.GrowingSegmentLabel + } // calling segment search in goroutines var wg sync.WaitGroup @@ -50,8 +55,7 @@ func searchOnSegments(replica ReplicaInterface, segType segmentType, searchReq * searchResults[i] = searchResult // update metrics metrics.QueryNodeSQSegmentLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), - metrics.SearchLabel, - metrics.SealedSegmentLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.SearchLabel, searchLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) }(segID, i) } wg.Wait() diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index 10227b1ea1..d98fd69737 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -71,6 +71,8 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref } log.Debug("init dml channels", zap.Int64("num", chanNum)) metrics.RootCoordNumOfDMLChannel.Add(float64(chanNum)) + metrics.RootCoordNumOfMsgStream.Add(float64(chanNum)) + return d } @@ -163,7 +165,6 @@ func (d *dmlChannels) addChannels(names ...string) { dms.refcnt++ dms.mutex.Unlock() } - metrics.RootCoordNumOfDMLChannel.Inc() } func (d *dmlChannels) removeChannels(names ...string) { @@ -183,7 +184,6 @@ func (d *dmlChannels) removeChannels(names ...string) { } dms.mutex.Unlock() } - metrics.RootCoordNumOfDMLChannel.Dec() } func genChannelName(prefix string, idx int64) string {