mirror of https://github.com/milvus-io/milvus.git
enhance: add detail, replica count for resource group (#38314)
issue: #30647 --------- Signed-off-by: chyezh <chyezh@outlook.com>pull/38108/head
parent
10460ed3f8
commit
833c74aa66
|
@ -29,6 +29,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/metastore"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
|
@ -187,8 +188,14 @@ func (m *ReplicaManager) put(ctx context.Context, replicas ...*Replica) error {
|
|||
// putReplicaInMemory puts replicas into in-memory map and collIDToReplicaIDs.
|
||||
func (m *ReplicaManager) putReplicaInMemory(replicas ...*Replica) {
|
||||
for _, replica := range replicas {
|
||||
if oldReplica, ok := m.replicas[replica.GetID()]; ok {
|
||||
metrics.QueryCoordResourceGroupReplicaTotal.WithLabelValues(oldReplica.GetResourceGroup()).Dec()
|
||||
metrics.QueryCoordReplicaRONodeTotal.Add(-float64(oldReplica.RONodesCount()))
|
||||
}
|
||||
// update in-memory replicas.
|
||||
m.replicas[replica.GetID()] = replica
|
||||
metrics.QueryCoordResourceGroupReplicaTotal.WithLabelValues(replica.GetResourceGroup()).Inc()
|
||||
metrics.QueryCoordReplicaRONodeTotal.Add(float64(replica.RONodesCount()))
|
||||
|
||||
// update collIDToReplicaIDs.
|
||||
if m.coll2Replicas[replica.GetCollectionID()] == nil {
|
||||
|
@ -280,6 +287,8 @@ func (m *ReplicaManager) RemoveCollection(ctx context.Context, collectionID type
|
|||
if collReplicas, ok := m.coll2Replicas[collectionID]; ok {
|
||||
// Remove all replica of collection and remove collection from collIDToReplicaIDs.
|
||||
for _, replica := range collReplicas.replicas {
|
||||
metrics.QueryCoordResourceGroupReplicaTotal.WithLabelValues(replica.GetResourceGroup()).Dec()
|
||||
metrics.QueryCoordReplicaRONodeTotal.Add(-float64(replica.RONodesCount()))
|
||||
delete(m.replicas, replica.GetID())
|
||||
}
|
||||
delete(m.coll2Replicas, collectionID)
|
||||
|
@ -302,8 +311,12 @@ func (m *ReplicaManager) removeReplicas(ctx context.Context, collectionID typeut
|
|||
return err
|
||||
}
|
||||
|
||||
for _, replica := range replicas {
|
||||
delete(m.replicas, replica)
|
||||
for _, replicaID := range replicas {
|
||||
if replica, ok := m.replicas[replicaID]; ok {
|
||||
metrics.QueryCoordResourceGroupReplicaTotal.WithLabelValues(replica.GetResourceGroup()).Dec()
|
||||
metrics.QueryCoordReplicaRONodeTotal.Add(float64(-replica.RONodesCount()))
|
||||
delete(m.replicas, replicaID)
|
||||
}
|
||||
}
|
||||
|
||||
if m.coll2Replicas[collectionID].removeReplicas(replicas...) {
|
||||
|
|
|
@ -19,9 +19,11 @@ package meta
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
@ -32,6 +34,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
|
@ -93,7 +96,7 @@ func (rm *ResourceManager) Recover(ctx context.Context) error {
|
|||
needUpgrade := meta.Config == nil
|
||||
|
||||
rg := NewResourceGroupFromMeta(meta, rm.nodeMgr)
|
||||
rm.groups[rg.GetName()] = rg
|
||||
rm.setupInMemResourceGroup(rg)
|
||||
for _, node := range rg.GetNodes() {
|
||||
if _, ok := rm.nodeIDMap[node]; ok {
|
||||
// unreachable code, should never happen.
|
||||
|
@ -158,7 +161,7 @@ func (rm *ResourceManager) AddResourceGroup(ctx context.Context, rgName string,
|
|||
return merr.WrapErrResourceGroupServiceAvailable()
|
||||
}
|
||||
|
||||
rm.groups[rgName] = rg
|
||||
rm.setupInMemResourceGroup(rg)
|
||||
log.Info("add resource group",
|
||||
zap.String("rgName", rgName),
|
||||
zap.Any("config", cfg),
|
||||
|
@ -218,7 +221,7 @@ func (rm *ResourceManager) updateResourceGroups(ctx context.Context, rgs map[str
|
|||
zap.String("rgName", rg.GetName()),
|
||||
zap.Any("config", rg.GetConfig()),
|
||||
)
|
||||
rm.groups[rg.GetName()] = rg
|
||||
rm.setupInMemResourceGroup(rg)
|
||||
}
|
||||
|
||||
// notify that resource group config has been changed.
|
||||
|
@ -318,6 +321,12 @@ func (rm *ResourceManager) RemoveResourceGroup(ctx context.Context, rgName strin
|
|||
// After recovering, all node assigned to these rg has been removed.
|
||||
// no secondary index need to be removed.
|
||||
delete(rm.groups, rgName)
|
||||
metrics.QueryCoordResourceGroupInfo.DeletePartialMatch(prometheus.Labels{
|
||||
metrics.ResourceGroupLabelName: rgName,
|
||||
})
|
||||
metrics.QueryCoordResourceGroupReplicaTotal.DeletePartialMatch(prometheus.Labels{
|
||||
metrics.ResourceGroupLabelName: rgName,
|
||||
})
|
||||
|
||||
log.Info("remove resource group",
|
||||
zap.String("rgName", rgName),
|
||||
|
@ -840,7 +849,7 @@ func (rm *ResourceManager) transferNode(ctx context.Context, rgName string, node
|
|||
|
||||
// Commit updates to memory.
|
||||
for _, rg := range modifiedRG {
|
||||
rm.groups[rg.GetName()] = rg
|
||||
rm.setupInMemResourceGroup(rg)
|
||||
}
|
||||
rm.nodeIDMap[node] = rgName
|
||||
log.Info("transfer node to resource group",
|
||||
|
@ -870,7 +879,7 @@ func (rm *ResourceManager) unassignNode(ctx context.Context, node int64) (string
|
|||
}
|
||||
|
||||
// Commit updates to memory.
|
||||
rm.groups[rg.GetName()] = rg
|
||||
rm.setupInMemResourceGroup(rg)
|
||||
delete(rm.nodeIDMap, node)
|
||||
log.Info("unassign node to resource group",
|
||||
zap.String("rgName", rg.GetName()),
|
||||
|
@ -945,6 +954,27 @@ func (rm *ResourceManager) validateResourceGroupIsDeletable(rgName string) error
|
|||
return nil
|
||||
}
|
||||
|
||||
// setupInMemResourceGroup setup resource group in memory.
|
||||
func (rm *ResourceManager) setupInMemResourceGroup(r *ResourceGroup) {
|
||||
// clear old metrics.
|
||||
if oldR, ok := rm.groups[r.GetName()]; ok {
|
||||
for _, nodeID := range oldR.GetNodes() {
|
||||
metrics.QueryCoordResourceGroupInfo.DeletePartialMatch(prometheus.Labels{
|
||||
metrics.ResourceGroupLabelName: r.GetName(),
|
||||
metrics.NodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
})
|
||||
}
|
||||
}
|
||||
// add new metrics.
|
||||
for _, nodeID := range r.GetNodes() {
|
||||
metrics.QueryCoordResourceGroupInfo.WithLabelValues(
|
||||
r.GetName(),
|
||||
strconv.FormatInt(nodeID, 10),
|
||||
).Set(1)
|
||||
}
|
||||
rm.groups[r.GetName()] = r
|
||||
}
|
||||
|
||||
func (rm *ResourceManager) GetResourceGroupsJSON(ctx context.Context) string {
|
||||
rm.rwmutex.RLock()
|
||||
defer rm.rwmutex.RUnlock()
|
||||
|
|
|
@ -88,6 +88,7 @@ const (
|
|||
segmentPruneLabelName = "segment_prune_label"
|
||||
stageLabelName = "compaction_stage"
|
||||
nodeIDLabelName = "node_id"
|
||||
nodeHostLabelName = "node_host"
|
||||
statusLabelName = "status"
|
||||
indexTaskStatusLabelName = "index_task_status"
|
||||
msgTypeLabelName = "msg_type"
|
||||
|
@ -98,7 +99,7 @@ const (
|
|||
queryTypeLabelName = "query_type"
|
||||
collectionName = "collection_name"
|
||||
databaseLabelName = "db_name"
|
||||
resourceGroupLabelName = "rg"
|
||||
ResourceGroupLabelName = "rg"
|
||||
indexName = "index_name"
|
||||
isVectorIndex = "is_vector_index"
|
||||
segmentStateLabelName = "segment_state"
|
||||
|
|
|
@ -132,6 +132,30 @@ var (
|
|||
Help: "latency of all kind of task in query coord scheduler scheduler",
|
||||
Buckets: longTaskBuckets,
|
||||
}, []string{collectionIDLabelName, taskTypeLabel, channelNameLabelName})
|
||||
|
||||
QueryCoordResourceGroupInfo = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.QueryCoordRole,
|
||||
Name: "resource_group_info",
|
||||
Help: "all resource group detail info in query coord",
|
||||
}, []string{ResourceGroupLabelName, NodeIDLabelName})
|
||||
|
||||
QueryCoordResourceGroupReplicaTotal = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.QueryCoordRole,
|
||||
Name: "resource_group_replica_total",
|
||||
Help: "total replica number of resource group",
|
||||
}, []string{ResourceGroupLabelName})
|
||||
|
||||
QueryCoordReplicaRONodeTotal = prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.QueryCoordRole,
|
||||
Name: "replica_ro_node_total",
|
||||
Help: "total read only node number of replica",
|
||||
})
|
||||
)
|
||||
|
||||
// RegisterQueryCoord registers QueryCoord metrics
|
||||
|
@ -146,6 +170,9 @@ func RegisterQueryCoord(registry *prometheus.Registry) {
|
|||
registry.MustRegister(QueryCoordNumQueryNodes)
|
||||
registry.MustRegister(QueryCoordCurrentTargetCheckpointUnixSeconds)
|
||||
registry.MustRegister(QueryCoordTaskLatency)
|
||||
registry.MustRegister(QueryCoordResourceGroupInfo)
|
||||
registry.MustRegister(QueryCoordResourceGroupReplicaTotal)
|
||||
registry.MustRegister(QueryCoordReplicaRONodeTotal)
|
||||
}
|
||||
|
||||
func CleanQueryCoordMetricsWithCollectionID(collectionID int64) {
|
||||
|
|
|
@ -201,7 +201,7 @@ var (
|
|||
nodeIDLabelName,
|
||||
queryTypeLabelName,
|
||||
databaseLabelName,
|
||||
resourceGroupLabelName,
|
||||
ResourceGroupLabelName,
|
||||
})
|
||||
|
||||
QueryNodeSQPerUserLatencyInQueue = prometheus.NewHistogramVec(
|
||||
|
@ -602,7 +602,7 @@ var (
|
|||
}, []string{
|
||||
nodeIDLabelName,
|
||||
databaseLabelName,
|
||||
resourceGroupLabelName,
|
||||
ResourceGroupLabelName,
|
||||
queryTypeLabelName,
|
||||
},
|
||||
)
|
||||
|
@ -617,7 +617,7 @@ var (
|
|||
}, []string{
|
||||
nodeIDLabelName,
|
||||
databaseLabelName,
|
||||
resourceGroupLabelName,
|
||||
ResourceGroupLabelName,
|
||||
queryTypeLabelName,
|
||||
},
|
||||
)
|
||||
|
@ -646,7 +646,7 @@ var (
|
|||
}, []string{
|
||||
nodeIDLabelName,
|
||||
databaseLabelName,
|
||||
resourceGroupLabelName,
|
||||
ResourceGroupLabelName,
|
||||
queryTypeLabelName,
|
||||
})
|
||||
|
||||
|
@ -660,7 +660,7 @@ var (
|
|||
}, []string{
|
||||
nodeIDLabelName,
|
||||
databaseLabelName,
|
||||
resourceGroupLabelName,
|
||||
ResourceGroupLabelName,
|
||||
queryTypeLabelName,
|
||||
})
|
||||
|
||||
|
@ -687,7 +687,7 @@ var (
|
|||
}, []string{
|
||||
nodeIDLabelName,
|
||||
databaseLabelName,
|
||||
resourceGroupLabelName,
|
||||
ResourceGroupLabelName,
|
||||
})
|
||||
|
||||
// QueryNodeDiskCacheLoadBytes records the number of bytes loaded from disk cache.
|
||||
|
@ -700,7 +700,7 @@ var (
|
|||
}, []string{
|
||||
nodeIDLabelName,
|
||||
databaseLabelName,
|
||||
resourceGroupLabelName,
|
||||
ResourceGroupLabelName,
|
||||
})
|
||||
|
||||
// QueryNodeDiskCacheLoadDuration records the total time cost of loading segments from disk cache.
|
||||
|
@ -714,7 +714,7 @@ var (
|
|||
}, []string{
|
||||
nodeIDLabelName,
|
||||
databaseLabelName,
|
||||
resourceGroupLabelName,
|
||||
ResourceGroupLabelName,
|
||||
})
|
||||
|
||||
// QueryNodeDiskCacheLoadGlobalDuration records the global time cost of loading segments from disk cache.
|
||||
|
@ -739,7 +739,7 @@ var (
|
|||
}, []string{
|
||||
nodeIDLabelName,
|
||||
databaseLabelName,
|
||||
resourceGroupLabelName,
|
||||
ResourceGroupLabelName,
|
||||
})
|
||||
|
||||
// QueryNodeDiskCacheEvictBytes records the number of bytes evicted from disk cache.
|
||||
|
@ -752,7 +752,7 @@ var (
|
|||
}, []string{
|
||||
nodeIDLabelName,
|
||||
databaseLabelName,
|
||||
resourceGroupLabelName,
|
||||
ResourceGroupLabelName,
|
||||
})
|
||||
|
||||
// QueryNodeDiskCacheEvictDuration records the total time cost of evicting segments from disk cache.
|
||||
|
@ -765,7 +765,7 @@ var (
|
|||
}, []string{
|
||||
nodeIDLabelName,
|
||||
databaseLabelName,
|
||||
resourceGroupLabelName,
|
||||
ResourceGroupLabelName,
|
||||
})
|
||||
|
||||
// QueryNodeDiskCacheEvictGlobalDuration records the global time cost of evicting segments from disk cache.
|
||||
|
|
Loading…
Reference in New Issue