mirror of https://github.com/milvus-io/milvus.git
enhance: [2.5] Remove unnecessary collection and partition label from the metrics (#40593)
/kind improvement pr: https://github.com/milvus-io/milvus/pull/39536 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/40632/head
parent
9542271554
commit
cd8f1fe0e4
|
@ -112,7 +112,7 @@ func (t *clusteringCompactionTask) Process() bool {
|
|||
lastStateDuration := ts - t.GetTaskProto().GetLastStateStartTime()
|
||||
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration))
|
||||
metrics.DataCoordCompactionLatency.
|
||||
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), fmt.Sprint(t.GetTaskProto().CollectionID), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState).
|
||||
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState).
|
||||
Observe(float64(lastStateDuration * 1000))
|
||||
updateOps := []compactionTaskOpt{setRetryTimes(0), setLastStateStartTime(ts)}
|
||||
|
||||
|
@ -121,7 +121,7 @@ func (t *clusteringCompactionTask) Process() bool {
|
|||
elapse := ts - t.GetTaskProto().StartTime
|
||||
log.Info("clustering compaction task total elapse", zap.Duration("costs", time.Duration(elapse)*time.Second))
|
||||
metrics.DataCoordCompactionLatency.
|
||||
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), fmt.Sprint(t.GetTaskProto().CollectionID), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), "total").
|
||||
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetTaskProto().GetClusteringKeyField().DataType)), t.GetTaskProto().Channel, datapb.CompactionType_ClusteringCompaction.String(), "total").
|
||||
Observe(float64(elapse * 1000))
|
||||
}
|
||||
err = t.updateAndSaveTaskMeta(updateOps...)
|
||||
|
|
|
@ -114,7 +114,8 @@ func (c *importChecker) Start() {
|
|||
for collID, collJobs := range jobsByColl {
|
||||
c.checkCollection(collID, collJobs)
|
||||
}
|
||||
c.LogStats()
|
||||
c.LogJobStats(jobs)
|
||||
c.LogTaskStats()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -125,7 +126,23 @@ func (c *importChecker) Close() {
|
|||
})
|
||||
}
|
||||
|
||||
func (c *importChecker) LogStats() {
|
||||
func (c *importChecker) LogJobStats(jobs []ImportJob) {
|
||||
byState := lo.GroupBy(jobs, func(job ImportJob) string {
|
||||
return job.GetState().String()
|
||||
})
|
||||
stateNum := make(map[string]int)
|
||||
for state := range internalpb.ImportJobState_value {
|
||||
if state == internalpb.ImportJobState_None.String() {
|
||||
continue
|
||||
}
|
||||
num := len(byState[state])
|
||||
stateNum[state] = num
|
||||
metrics.ImportJobs.WithLabelValues(state).Set(float64(num))
|
||||
}
|
||||
log.Info("import job stats", zap.Any("stateNum", stateNum))
|
||||
}
|
||||
|
||||
func (c *importChecker) LogTaskStats() {
|
||||
logFunc := func(tasks []ImportTask, taskType TaskType) {
|
||||
byState := lo.GroupBy(tasks, func(t ImportTask) datapb.ImportTaskStateV2 {
|
||||
return t.GetState()
|
||||
|
|
|
@ -139,7 +139,7 @@ func (s *ImportCheckerSuite) TestLogStats() {
|
|||
err = s.imeta.AddTask(context.TODO(), it1)
|
||||
s.NoError(err)
|
||||
|
||||
s.checker.LogStats()
|
||||
s.checker.LogTaskStats()
|
||||
}
|
||||
|
||||
func (s *ImportCheckerSuite) TestCheckJob() {
|
||||
|
|
|
@ -92,11 +92,8 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
|
|||
metrics.QueryNodeEntitiesSize.WithLabelValues(nodeID, fmt.Sprint(collection),
|
||||
segments.SegmentTypeGrowing.String()).Set(float64(size))
|
||||
}
|
||||
growingGroupByPartition := lo.GroupBy(growingSegments, func(seg segments.Segment) int64 {
|
||||
return seg.Partition()
|
||||
})
|
||||
|
||||
for _, segs := range growingGroupByPartition {
|
||||
for _, segs := range growingGroupByCollection {
|
||||
numEntities := lo.SumBy(segs, func(seg segments.Segment) int64 {
|
||||
return seg.RowNum()
|
||||
})
|
||||
|
@ -106,7 +103,6 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
|
|||
collections[segment.Collection()],
|
||||
nodeID,
|
||||
fmt.Sprint(segment.Collection()),
|
||||
fmt.Sprint(segment.Partition()),
|
||||
segments.SegmentTypeGrowing.String(),
|
||||
).Set(float64(numEntities))
|
||||
}
|
||||
|
@ -136,7 +132,6 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
|
|||
collections[segment.Collection()],
|
||||
nodeID,
|
||||
fmt.Sprint(segment.Collection()),
|
||||
fmt.Sprint(segment.Partition()),
|
||||
segments.SegmentTypeSealed.String(),
|
||||
).Set(float64(numEntities))
|
||||
}
|
||||
|
|
|
@ -409,9 +409,7 @@ func (mgr *segmentManager) Put(ctx context.Context, segmentType SegmentType, seg
|
|||
metrics.QueryNodeNumSegments.WithLabelValues(
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
fmt.Sprint(segment.Collection()),
|
||||
fmt.Sprint(segment.Partition()),
|
||||
segment.Type().String(),
|
||||
fmt.Sprint(len(segment.Indexes())),
|
||||
segment.Level().String(),
|
||||
).Inc()
|
||||
}
|
||||
|
@ -736,9 +734,7 @@ func (mgr *segmentManager) release(ctx context.Context, segment Segment) {
|
|||
metrics.QueryNodeNumSegments.WithLabelValues(
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
fmt.Sprint(segment.Collection()),
|
||||
fmt.Sprint(segment.Partition()),
|
||||
segment.Type().String(),
|
||||
fmt.Sprint(len(segment.Indexes())),
|
||||
segment.Level().String(),
|
||||
).Dec()
|
||||
|
||||
|
|
|
@ -546,10 +546,8 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() {
|
|||
l0Segment := segments.NewMockSegment(suite.T())
|
||||
l0Segment.EXPECT().ID().Return(10000)
|
||||
l0Segment.EXPECT().Collection().Return(suite.collectionID)
|
||||
l0Segment.EXPECT().Partition().Return(common.AllPartitionsID)
|
||||
l0Segment.EXPECT().Level().Return(datapb.SegmentLevel_L0)
|
||||
l0Segment.EXPECT().Type().Return(commonpb.SegmentState_Sealed)
|
||||
l0Segment.EXPECT().Indexes().Return(nil)
|
||||
l0Segment.EXPECT().Shard().Return(suite.channel)
|
||||
l0Segment.EXPECT().Release(ctx).Return()
|
||||
|
||||
|
|
|
@ -206,7 +206,6 @@ var (
|
|||
Buckets: longTaskBuckets,
|
||||
}, []string{
|
||||
isVectorFieldLabelName,
|
||||
collectionIDLabelName,
|
||||
channelNameLabelName,
|
||||
compactionTypeLabelName,
|
||||
stageLabelName,
|
||||
|
@ -328,6 +327,14 @@ var (
|
|||
Help: "number of IndexNodes managed by IndexCoord",
|
||||
}, []string{})
|
||||
|
||||
ImportJobs = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.DataCoordRole,
|
||||
Name: "import_jobs",
|
||||
Help: "the import jobs grouping by state",
|
||||
}, []string{"import_state"})
|
||||
|
||||
ImportTasks = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
|
|
|
@ -103,18 +103,6 @@ var (
|
|||
collectionIDLabelName,
|
||||
})
|
||||
|
||||
DataNodeProduceTimeTickLag = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.DataNodeRole,
|
||||
Name: "produce_tt_lag_ms",
|
||||
Help: "now time minus tt pts per physical channel",
|
||||
}, []string{
|
||||
nodeIDLabelName,
|
||||
collectionIDLabelName,
|
||||
channelNameLabelName,
|
||||
})
|
||||
|
||||
DataNodeConsumeMsgCount = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: milvusNamespace,
|
||||
|
@ -295,7 +283,6 @@ func RegisterDataNode(registry *prometheus.Registry) {
|
|||
// deprecated metrics
|
||||
registry.MustRegister(DataNodeForwardDeleteMsgTimeTaken)
|
||||
registry.MustRegister(DataNodeNumProducers)
|
||||
registry.MustRegister(DataNodeProduceTimeTickLag)
|
||||
}
|
||||
|
||||
func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel string) {
|
||||
|
@ -307,14 +294,6 @@ func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel
|
|||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
|
||||
DataNodeProduceTimeTickLag.
|
||||
Delete(
|
||||
prometheus.Labels{
|
||||
nodeIDLabelName: fmt.Sprint(nodeID),
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
channelNameLabelName: channel,
|
||||
})
|
||||
|
||||
for _, label := range []string{AllLabel, DeleteLabel, InsertLabel} {
|
||||
DataNodeConsumeMsgCount.
|
||||
Delete(
|
||||
|
|
|
@ -94,7 +94,6 @@ const (
|
|||
indexTaskStatusLabelName = "index_task_status"
|
||||
msgTypeLabelName = "msg_type"
|
||||
collectionIDLabelName = "collection_id"
|
||||
partitionIDLabelName = "partition_id"
|
||||
channelNameLabelName = "channel_name"
|
||||
functionLabelName = "function_name"
|
||||
queryTypeLabelName = "query_type"
|
||||
|
@ -110,7 +109,6 @@ const (
|
|||
roleNameLabelName = "role_name"
|
||||
cacheNameLabelName = "cache_name"
|
||||
cacheStateLabelName = "cache_state"
|
||||
indexCountLabelName = "indexed_field_count"
|
||||
dataSourceLabelName = "data_source"
|
||||
importStageLabelName = "import_stage"
|
||||
requestScope = "scope"
|
||||
|
|
|
@ -125,9 +125,7 @@ var (
|
|||
}, []string{
|
||||
nodeIDLabelName,
|
||||
collectionIDLabelName,
|
||||
partitionIDLabelName,
|
||||
segmentStateLabelName,
|
||||
indexCountLabelName,
|
||||
segmentLevelLabelName,
|
||||
})
|
||||
|
||||
|
@ -455,7 +453,6 @@ var (
|
|||
collectionName,
|
||||
nodeIDLabelName,
|
||||
collectionIDLabelName,
|
||||
partitionIDLabelName,
|
||||
segmentStateLabelName,
|
||||
})
|
||||
|
||||
|
|
Loading…
Reference in New Issue