mirror of https://github.com/milvus-io/milvus.git
Fix bugs in monitoring metrics (#17894)
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>pull/17959/head
parent
e1839e3534
commit
0baeb609dc
|
@ -85,7 +85,7 @@ func (m *meta) reloadFromKV() error {
|
|||
}
|
||||
state := segmentInfo.GetState()
|
||||
m.segments.SetSegment(segmentInfo.GetID(), NewSegmentInfo(segmentInfo))
|
||||
metrics.DataCoordNumSegments.WithLabelValues(string(state)).Inc()
|
||||
metrics.DataCoordNumSegments.WithLabelValues(state.String()).Inc()
|
||||
if state == commonpb.SegmentState_Flushed {
|
||||
numStoredRows += segmentInfo.GetNumOfRows()
|
||||
}
|
||||
|
@ -175,7 +175,7 @@ func (m *meta) AddSegment(segment *SegmentInfo) error {
|
|||
if err := m.saveSegmentInfo(segment); err != nil {
|
||||
return err
|
||||
}
|
||||
metrics.DataCoordNumSegments.WithLabelValues(string(segment.GetState())).Inc()
|
||||
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String()).Inc()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -233,8 +233,8 @@ func (m *meta) SetState(segmentID UniqueID, state commonpb.SegmentState) error {
|
|||
if curSegInfo != nil && isSegmentHealthy(curSegInfo) {
|
||||
err := m.saveSegmentInfo(curSegInfo)
|
||||
if err == nil {
|
||||
metrics.DataCoordNumSegments.WithLabelValues(string(oldState)).Dec()
|
||||
metrics.DataCoordNumSegments.WithLabelValues(string(state)).Inc()
|
||||
metrics.DataCoordNumSegments.WithLabelValues(oldState.String()).Dec()
|
||||
metrics.DataCoordNumSegments.WithLabelValues(state.String()).Inc()
|
||||
if state == commonpb.SegmentState_Flushed {
|
||||
metrics.DataCoordNumStoredRows.WithLabelValues().Add(float64(curSegInfo.GetNumOfRows()))
|
||||
metrics.DataCoordNumStoredRowsCounter.WithLabelValues().Add(float64(curSegInfo.GetNumOfRows()))
|
||||
|
@ -397,8 +397,8 @@ func (m *meta) UpdateFlushSegmentsInfo(
|
|||
}
|
||||
oldSegmentState := segment.GetState()
|
||||
newSegmentState := clonedSegment.GetState()
|
||||
metrics.DataCoordNumSegments.WithLabelValues(string(oldSegmentState)).Dec()
|
||||
metrics.DataCoordNumSegments.WithLabelValues(string(newSegmentState)).Inc()
|
||||
metrics.DataCoordNumSegments.WithLabelValues(oldSegmentState.String()).Dec()
|
||||
metrics.DataCoordNumSegments.WithLabelValues(newSegmentState.String()).Inc()
|
||||
if newSegmentState == commonpb.SegmentState_Flushed {
|
||||
metrics.DataCoordNumStoredRows.WithLabelValues().Add(float64(clonedSegment.GetNumOfRows()))
|
||||
metrics.DataCoordNumStoredRowsCounter.WithLabelValues().Add(float64(clonedSegment.GetNumOfRows()))
|
||||
|
@ -448,7 +448,7 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI
|
|||
for _, seg := range originSegments {
|
||||
state := seg.GetState()
|
||||
metrics.DataCoordNumSegments.WithLabelValues(
|
||||
string(state)).Dec()
|
||||
state.String()).Dec()
|
||||
if state == commonpb.SegmentState_Flushed {
|
||||
metrics.DataCoordNumStoredRows.WithLabelValues().Sub(float64(seg.GetNumOfRows()))
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/logutil"
|
||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
|
@ -575,7 +576,9 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat
|
|||
}
|
||||
|
||||
utcT, _ := tsoutil.ParseHybridTs(ts)
|
||||
metrics.DataCoordSyncEpoch.WithLabelValues(ch).Set(float64(utcT))
|
||||
|
||||
pChannelName := funcutil.ToPhysicalChannel(ch)
|
||||
metrics.DataCoordSyncEpoch.WithLabelValues(pChannelName).Set(float64(utcT))
|
||||
|
||||
s.updateSegmentStatistics(ttMsg.GetSegmentsStats())
|
||||
|
||||
|
|
|
@ -159,7 +159,7 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er
|
|||
|
||||
// store
|
||||
delDataBuf.updateSize(int64(rows))
|
||||
metrics.DataNodeConsumeMsgRowsCount.WithLabelValues(metrics.DeleteLabel, fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Add(float64(rows))
|
||||
metrics.DataNodeConsumeMsgRowsCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.DeleteLabel).Add(float64(rows))
|
||||
delDataBuf.updateTimeRange(tr)
|
||||
dn.delBuf.Store(segID, delDataBuf)
|
||||
}
|
||||
|
|
|
@ -215,7 +215,7 @@ func (replica *SegmentReplica) new2FlushedSegment(segID UniqueID) {
|
|||
replica.flushedSegments[segID] = &seg
|
||||
|
||||
delete(replica.newSegments, segID)
|
||||
metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.NodeID)).Dec()
|
||||
metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Dec()
|
||||
}
|
||||
|
||||
// normal2FlushedSegment transfers a segment from *normal* to *flushed* by changing *isFlushed*
|
||||
|
@ -287,7 +287,7 @@ func (replica *SegmentReplica) addNewSegment(segID, collID, partitionID UniqueID
|
|||
replica.segMu.Lock()
|
||||
defer replica.segMu.Unlock()
|
||||
replica.newSegments[segID] = seg
|
||||
metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc()
|
||||
metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Inc()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -381,7 +381,7 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu
|
|||
replica.segMu.Lock()
|
||||
replica.normalSegments[segID] = seg
|
||||
replica.segMu.Unlock()
|
||||
metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc()
|
||||
metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Inc()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -579,7 +579,7 @@ func (replica *SegmentReplica) removeSegments(segIDs ...UniqueID) {
|
|||
cnt++
|
||||
}
|
||||
}
|
||||
metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.NodeID)).Sub(float64(cnt))
|
||||
metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Sub(float64(cnt))
|
||||
|
||||
for _, segID := range segIDs {
|
||||
delete(replica.newSegments, segID)
|
||||
|
|
|
@ -137,5 +137,6 @@ func RegisterDataCoord(registry *prometheus.Registry) {
|
|||
registry.MustRegister(DataCoordNumSegments)
|
||||
registry.MustRegister(DataCoordNumCollections)
|
||||
registry.MustRegister(DataCoordNumStoredRows)
|
||||
registry.MustRegister(DataCoordNumStoredRowsCounter)
|
||||
registry.MustRegister(DataCoordSyncEpoch)
|
||||
}
|
||||
|
|
|
@ -45,21 +45,11 @@ var (
|
|||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "sq_lantency",
|
||||
Name: "sq_latency",
|
||||
Help: "latency of search",
|
||||
Buckets: buckets,
|
||||
}, []string{nodeIDLabelName, queryTypeLabelName})
|
||||
|
||||
// ProxySendSQReqLatency record the latency that the proxy sent the search request to the message stream.
|
||||
ProxySendSQReqLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "sq_send_latency",
|
||||
Help: "latency that proxy sent the search request to the message stream",
|
||||
Buckets: buckets, // unit: ms
|
||||
}, []string{nodeIDLabelName, queryTypeLabelName})
|
||||
|
||||
// ProxyWaitForSearchResultLatency record the time that the proxy waits for the search result.
|
||||
ProxyWaitForSearchResultLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
|
@ -249,7 +239,6 @@ func RegisterProxy(registry *prometheus.Registry) {
|
|||
registry.MustRegister(ProxyInsertVectors)
|
||||
|
||||
registry.MustRegister(ProxySearchLatency)
|
||||
registry.MustRegister(ProxySendSQReqLatency)
|
||||
registry.MustRegister(ProxyWaitForSearchResultLatency)
|
||||
registry.MustRegister(ProxyReduceResultLatency)
|
||||
registry.MustRegister(ProxyDecodeResultLatency)
|
||||
|
|
|
@ -110,7 +110,7 @@ var (
|
|||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.QueryNodeRole,
|
||||
Name: "sq_queue_lantency",
|
||||
Name: "sq_queue_latency",
|
||||
Help: "latency of search or query in queue",
|
||||
Buckets: buckets,
|
||||
}, []string{
|
||||
|
@ -285,7 +285,7 @@ var (
|
|||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.QueryNodeRole,
|
||||
Name: "entities_num",
|
||||
Name: "entity_num",
|
||||
Help: "number of entities which can be searched/queried",
|
||||
}, []string{
|
||||
nodeIDLabelName,
|
||||
|
|
|
@ -27,7 +27,7 @@ var (
|
|||
Subsystem: typeutil.RootCoordRole,
|
||||
Name: "sync_epoch_time",
|
||||
Help: "synchronized unix epoch per physical channel",
|
||||
}, []string{"PChannel"})
|
||||
}, []string{channelNameLabelName})
|
||||
|
||||
RootCoordDDLReqCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||
)
|
||||
|
||||
|
@ -31,6 +32,10 @@ func searchOnSegments(replica ReplicaInterface, segType segmentType, searchReq *
|
|||
// results variables
|
||||
searchResults := make([]*SearchResult, len(segIDs))
|
||||
errs := make([]error, len(segIDs))
|
||||
searchLabel := metrics.SealedSegmentLabel
|
||||
if segType == commonpb.SegmentState_Growing {
|
||||
searchLabel = metrics.GrowingSegmentLabel
|
||||
}
|
||||
|
||||
// calling segment search in goroutines
|
||||
var wg sync.WaitGroup
|
||||
|
@ -50,8 +55,7 @@ func searchOnSegments(replica ReplicaInterface, segType segmentType, searchReq *
|
|||
searchResults[i] = searchResult
|
||||
// update metrics
|
||||
metrics.QueryNodeSQSegmentLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
|
||||
metrics.SearchLabel,
|
||||
metrics.SealedSegmentLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
metrics.SearchLabel, searchLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
}(segID, i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
|
|
@ -71,6 +71,8 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref
|
|||
}
|
||||
log.Debug("init dml channels", zap.Int64("num", chanNum))
|
||||
metrics.RootCoordNumOfDMLChannel.Add(float64(chanNum))
|
||||
metrics.RootCoordNumOfMsgStream.Add(float64(chanNum))
|
||||
|
||||
return d
|
||||
}
|
||||
|
||||
|
@ -163,7 +165,6 @@ func (d *dmlChannels) addChannels(names ...string) {
|
|||
dms.refcnt++
|
||||
dms.mutex.Unlock()
|
||||
}
|
||||
metrics.RootCoordNumOfDMLChannel.Inc()
|
||||
}
|
||||
|
||||
func (d *dmlChannels) removeChannels(names ...string) {
|
||||
|
@ -183,7 +184,6 @@ func (d *dmlChannels) removeChannels(names ...string) {
|
|||
}
|
||||
dms.mutex.Unlock()
|
||||
}
|
||||
metrics.RootCoordNumOfDMLChannel.Dec()
|
||||
}
|
||||
|
||||
func genChannelName(prefix string, idx int64) string {
|
||||
|
|
Loading…
Reference in New Issue