mirror of https://github.com/milvus-io/milvus.git
enhance: Add db label for some usual metrics (#32024)
issue: https://github.com/milvus-io/milvus/issues/31782 pr: #30956 cherry pick: #30956 #32003 #31715 --------- Signed-off-by: jaime <yun.zhang@zilliz.com> Signed-off-by: chyezh <chyezh@outlook.com> Co-authored-by: chyezh <chyezh@outlook.com>pull/32142/head
parent
fcef1c54da
commit
bb2ffd47f6
|
@ -84,6 +84,7 @@ type collectionInfo struct {
|
|||
StartPositions []*commonpb.KeyDataPair
|
||||
Properties map[string]string
|
||||
CreatedAt Timestamp
|
||||
DatabaseName string
|
||||
}
|
||||
|
||||
// NewMeta creates meta from provided `kv.TxnKV`
|
||||
|
@ -198,6 +199,7 @@ func (m *meta) GetClonedCollectionInfo(collectionID UniqueID) *collectionInfo {
|
|||
Partitions: coll.Partitions,
|
||||
StartPositions: common.CloneKeyDataPairs(coll.StartPositions),
|
||||
Properties: clonedProperties,
|
||||
DatabaseName: coll.DatabaseName,
|
||||
}
|
||||
|
||||
return cloneColl
|
||||
|
@ -267,17 +269,29 @@ func (m *meta) GetCollectionBinlogSize() (int64, map[UniqueID]int64) {
|
|||
if isSegmentHealthy(segment) && !segment.GetIsImporting() {
|
||||
total += segmentSize
|
||||
collectionBinlogSize[segment.GetCollectionID()] += segmentSize
|
||||
metrics.DataCoordStoredBinlogSize.WithLabelValues(
|
||||
fmt.Sprint(segment.GetCollectionID()), fmt.Sprint(segment.GetID())).Set(float64(segmentSize))
|
||||
|
||||
coll, ok := m.collections[segment.GetCollectionID()]
|
||||
if ok {
|
||||
metrics.DataCoordStoredBinlogSize.WithLabelValues(coll.DatabaseName,
|
||||
fmt.Sprint(segment.GetCollectionID()), fmt.Sprint(segment.GetID())).Set(float64(segmentSize))
|
||||
} else {
|
||||
log.Warn("not found database name", zap.Int64("collectionID", segment.GetCollectionID()))
|
||||
}
|
||||
|
||||
if _, ok := collectionRowsNum[segment.GetCollectionID()]; !ok {
|
||||
collectionRowsNum[segment.GetCollectionID()] = make(map[commonpb.SegmentState]int64)
|
||||
}
|
||||
collectionRowsNum[segment.GetCollectionID()][segment.GetState()] += segment.GetNumOfRows()
|
||||
}
|
||||
}
|
||||
for collection, statesRows := range collectionRowsNum {
|
||||
for collectionID, statesRows := range collectionRowsNum {
|
||||
for state, rows := range statesRows {
|
||||
metrics.DataCoordNumStoredRows.WithLabelValues(fmt.Sprint(collection), state.String()).Set(float64(rows))
|
||||
coll, ok := m.collections[collectionID]
|
||||
if ok {
|
||||
metrics.DataCoordNumStoredRows.WithLabelValues(coll.DatabaseName, fmt.Sprint(collectionID), state.String()).Set(float64(rows))
|
||||
} else {
|
||||
log.Warn("not found database name", zap.Int64("collectionID", collectionID))
|
||||
}
|
||||
}
|
||||
}
|
||||
return total, collectionBinlogSize
|
||||
|
|
|
@ -37,6 +37,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/testutils"
|
||||
|
@ -400,6 +401,7 @@ func TestMeta_Basic(t *testing.T) {
|
|||
Schema: testSchema,
|
||||
Partitions: []UniqueID{partID0, partID1},
|
||||
StartPositions: []*commonpb.KeyDataPair{},
|
||||
DatabaseName: util.DefaultDBName,
|
||||
}
|
||||
collInfoWoPartition := &collectionInfo{
|
||||
ID: collID,
|
||||
|
@ -605,6 +607,12 @@ func TestMeta_Basic(t *testing.T) {
|
|||
assert.Len(t, collectionBinlogSize, 1)
|
||||
assert.Equal(t, int64(size0+size1), collectionBinlogSize[collID])
|
||||
assert.Equal(t, int64(size0+size1), total)
|
||||
|
||||
meta.collections[collID] = collInfo
|
||||
total, collectionBinlogSize = meta.GetCollectionBinlogSize()
|
||||
assert.Len(t, collectionBinlogSize, 1)
|
||||
assert.Equal(t, int64(size0+size1), collectionBinlogSize[collID])
|
||||
assert.Equal(t, int64(size0+size1), total)
|
||||
})
|
||||
|
||||
t.Run("Test AddAllocation", func(t *testing.T) {
|
||||
|
|
|
@ -1201,6 +1201,7 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i
|
|||
StartPositions: resp.GetStartPositions(),
|
||||
Properties: properties,
|
||||
CreatedAt: resp.GetCreatedTimestamp(),
|
||||
DatabaseName: resp.GetDbName(),
|
||||
}
|
||||
s.meta.AddCollection(collInfo)
|
||||
return nil
|
||||
|
|
|
@ -133,8 +133,8 @@ func updateProxyFunctionCallMetric(fullMethod string) {
|
|||
if method == "" {
|
||||
return
|
||||
}
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, "", "").Inc()
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, "", "").Inc()
|
||||
}
|
||||
|
||||
func getCurrentUser(ctx context.Context) string {
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -128,6 +128,7 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
|
|||
panic(err)
|
||||
}
|
||||
metrics.QueryNodeNumEntities.WithLabelValues(
|
||||
growing.DatabaseName(),
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
fmt.Sprint(growing.Collection()),
|
||||
fmt.Sprint(growing.Partition()),
|
||||
|
|
|
@ -117,6 +117,47 @@ func (_c *MockSegment_Collection_Call) RunAndReturn(run func() int64) *MockSegme
|
|||
return _c
|
||||
}
|
||||
|
||||
// DatabaseName provides a mock function with given fields:
|
||||
func (_m *MockSegment) DatabaseName() string {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 string
|
||||
if rf, ok := ret.Get(0).(func() string); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Get(0).(string)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// MockSegment_DatabaseName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DatabaseName'
|
||||
type MockSegment_DatabaseName_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// DatabaseName is a helper method to define mock.On call
|
||||
func (_e *MockSegment_Expecter) DatabaseName() *MockSegment_DatabaseName_Call {
|
||||
return &MockSegment_DatabaseName_Call{Call: _e.mock.On("DatabaseName")}
|
||||
}
|
||||
|
||||
func (_c *MockSegment_DatabaseName_Call) Run(run func()) *MockSegment_DatabaseName_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run()
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockSegment_DatabaseName_Call) Return(_a0 string) *MockSegment_DatabaseName_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockSegment_DatabaseName_Call) RunAndReturn(run func() string) *MockSegment_DatabaseName_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// Delete provides a mock function with given fields: ctx, primaryKeys, timestamps
|
||||
func (_m *MockSegment) Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []uint64) error {
|
||||
ret := _m.Called(ctx, primaryKeys, timestamps)
|
||||
|
@ -911,6 +952,47 @@ func (_c *MockSegment_Release_Call) RunAndReturn(run func(...releaseOption)) *Mo
|
|||
return _c
|
||||
}
|
||||
|
||||
// ResourceGroup provides a mock function with given fields:
|
||||
func (_m *MockSegment) ResourceGroup() string {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 string
|
||||
if rf, ok := ret.Get(0).(func() string); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Get(0).(string)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// MockSegment_ResourceGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ResourceGroup'
|
||||
type MockSegment_ResourceGroup_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// ResourceGroup is a helper method to define mock.On call
|
||||
func (_e *MockSegment_Expecter) ResourceGroup() *MockSegment_ResourceGroup_Call {
|
||||
return &MockSegment_ResourceGroup_Call{Call: _e.mock.On("ResourceGroup")}
|
||||
}
|
||||
|
||||
func (_c *MockSegment_ResourceGroup_Call) Run(run func()) *MockSegment_ResourceGroup_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run()
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockSegment_ResourceGroup_Call) Return(_a0 string) *MockSegment_ResourceGroup_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockSegment_ResourceGroup_Call) RunAndReturn(run func() string) *MockSegment_ResourceGroup_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// ResourceUsageEstimate provides a mock function with given fields:
|
||||
func (_m *MockSegment) ResourceUsageEstimate() ResourceUsage {
|
||||
ret := _m.Called()
|
||||
|
|
|
@ -118,6 +118,14 @@ func (s *baseSegment) Partition() int64 {
|
|||
return s.loadInfo.GetPartitionID()
|
||||
}
|
||||
|
||||
func (s *baseSegment) DatabaseName() string {
|
||||
return s.collection.GetDBName()
|
||||
}
|
||||
|
||||
func (s *baseSegment) ResourceGroup() string {
|
||||
return s.collection.GetResourceGroup()
|
||||
}
|
||||
|
||||
func (s *baseSegment) Shard() string {
|
||||
return s.loadInfo.GetInsertChannel()
|
||||
}
|
||||
|
@ -1361,6 +1369,7 @@ func (s *LocalSegment) Release(opts ...releaseOption) {
|
|||
C.DeleteSegment(ptr)
|
||||
|
||||
metrics.QueryNodeNumEntities.WithLabelValues(
|
||||
s.DatabaseName(),
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
fmt.Sprint(s.Collection()),
|
||||
fmt.Sprint(s.Partition()),
|
||||
|
|
|
@ -23,7 +23,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/segcorepb"
|
||||
storage "github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
|
@ -50,6 +50,8 @@ type Segment interface {
|
|||
|
||||
// Properties
|
||||
ID() int64
|
||||
DatabaseName() string
|
||||
ResourceGroup() string
|
||||
Collection() int64
|
||||
Partition() int64
|
||||
Shard() string
|
||||
|
|
|
@ -1027,6 +1027,7 @@ func (loader *segmentLoader) loadSegment(ctx context.Context,
|
|||
}
|
||||
|
||||
metrics.QueryNodeNumEntities.WithLabelValues(
|
||||
segment.DatabaseName(),
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
fmt.Sprint(segment.Collection()),
|
||||
fmt.Sprint(segment.Partition()),
|
||||
|
@ -1263,6 +1264,14 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment,
|
|||
return err
|
||||
}
|
||||
|
||||
metrics.QueryNodeNumEntities.WithLabelValues(
|
||||
segment.DatabaseName(),
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
fmt.Sprint(segment.Collection()),
|
||||
fmt.Sprint(segment.Partition()),
|
||||
segment.Type().String(),
|
||||
strconv.FormatInt(int64(len(segment.Indexes())), 10),
|
||||
).Sub(float64(deltaData.RowCount))
|
||||
log.Info("load delta logs done", zap.Int64("deleteCount", deltaData.RowCount))
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -133,11 +133,10 @@ func (mt *MetaTable) reload() error {
|
|||
mt.names = newNameDb()
|
||||
mt.aliases = newNameDb()
|
||||
|
||||
collectionNum := int64(0)
|
||||
partitionNum := int64(0)
|
||||
|
||||
metrics.RootCoordNumOfCollections.Set(float64(0))
|
||||
metrics.RootCoordNumOfPartitions.WithLabelValues().Set(float64(0))
|
||||
metrics.RootCoordNumOfCollections.Reset()
|
||||
metrics.RootCoordNumOfPartitions.Reset()
|
||||
|
||||
// recover databases.
|
||||
dbs, err := mt.catalog.ListDatabases(mt.ctx, typeutil.MaxTimestamp)
|
||||
|
@ -173,6 +172,7 @@ func (mt *MetaTable) reload() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
collectionNum := int64(0)
|
||||
for _, collection := range collections {
|
||||
mt.collID2Meta[collection.CollectionID] = collection
|
||||
if collection.Available() {
|
||||
|
@ -181,9 +181,12 @@ func (mt *MetaTable) reload() error {
|
|||
partitionNum += int64(collection.GetPartitionNum(true))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("recover collections from db", zap.Int64("collection_num", collectionNum), zap.Int64("partition_num", partitionNum))
|
||||
metrics.RootCoordNumOfCollections.WithLabelValues(dbName).Add(float64(collectionNum))
|
||||
log.Info("collections recovered from db", zap.String("db_name", dbName),
|
||||
zap.Int64("collection_num", collectionNum),
|
||||
zap.Int64("partition_num", partitionNum))
|
||||
}
|
||||
|
||||
// recover aliases from db namespace
|
||||
for dbName, db := range mt.dbName2Meta {
|
||||
|
@ -197,7 +200,6 @@ func (mt *MetaTable) reload() error {
|
|||
}
|
||||
}
|
||||
|
||||
metrics.RootCoordNumOfCollections.Add(float64(collectionNum))
|
||||
metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(partitionNum))
|
||||
log.Info("RootCoord meta table reload done", zap.Duration("duration", record.ElapseSpan()))
|
||||
return nil
|
||||
|
@ -233,7 +235,7 @@ func (mt *MetaTable) reloadWithNonDatabase() error {
|
|||
mt.aliases.insert(util.DefaultDBName, alias.Name, alias.CollectionID)
|
||||
}
|
||||
|
||||
metrics.RootCoordNumOfCollections.Add(float64(collectionNum))
|
||||
metrics.RootCoordNumOfCollections.WithLabelValues(util.DefaultDBName).Add(float64(collectionNum))
|
||||
metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(partitionNum))
|
||||
return nil
|
||||
}
|
||||
|
@ -398,12 +400,17 @@ func (mt *MetaTable) ChangeCollectionState(ctx context.Context, collectionID Uni
|
|||
}
|
||||
mt.collID2Meta[collectionID] = clone
|
||||
|
||||
db, err := mt.getDatabaseByIDInternal(ctx, coll.DBID, typeutil.MaxTimestamp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dbID not found for collection:%d", collectionID)
|
||||
}
|
||||
|
||||
switch state {
|
||||
case pb.CollectionState_CollectionCreated:
|
||||
metrics.RootCoordNumOfCollections.Inc()
|
||||
metrics.RootCoordNumOfCollections.WithLabelValues(db.Name).Inc()
|
||||
metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(coll.GetPartitionNum(true)))
|
||||
default:
|
||||
metrics.RootCoordNumOfCollections.Dec()
|
||||
metrics.RootCoordNumOfCollections.WithLabelValues(db.Name).Dec()
|
||||
metrics.RootCoordNumOfPartitions.WithLabelValues().Sub(float64(coll.GetPartitionNum(true)))
|
||||
}
|
||||
|
||||
|
|
|
@ -1333,6 +1333,26 @@ func TestMetaTable_ChangeCollectionState(t *testing.T) {
|
|||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("not found dbID", func(t *testing.T) {
|
||||
catalog := mocks.NewRootCoordCatalog(t)
|
||||
catalog.On("AlterCollection",
|
||||
mock.Anything, // context.Context
|
||||
mock.Anything, // *model.Collection
|
||||
mock.Anything, // *model.Collection
|
||||
mock.Anything, // metastore.AlterType
|
||||
mock.AnythingOfType("uint64"),
|
||||
).Return(nil)
|
||||
meta := &MetaTable{
|
||||
catalog: catalog,
|
||||
dbName2Meta: map[string]*model.Database{},
|
||||
collID2Meta: map[typeutil.UniqueID]*model.Collection{
|
||||
100: {Name: "test", CollectionID: 100, DBID: util.DefaultDBID},
|
||||
},
|
||||
}
|
||||
err := meta.ChangeCollectionState(context.TODO(), 100, pb.CollectionState_CollectionCreated, 1000)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("normal case", func(t *testing.T) {
|
||||
catalog := mocks.NewRootCoordCatalog(t)
|
||||
catalog.On("AlterCollection",
|
||||
|
@ -1344,8 +1364,11 @@ func TestMetaTable_ChangeCollectionState(t *testing.T) {
|
|||
).Return(nil)
|
||||
meta := &MetaTable{
|
||||
catalog: catalog,
|
||||
dbName2Meta: map[string]*model.Database{
|
||||
util.DefaultDBName: {Name: util.DefaultDBName, ID: util.DefaultDBID},
|
||||
},
|
||||
collID2Meta: map[typeutil.UniqueID]*model.Collection{
|
||||
100: {Name: "test", CollectionID: 100},
|
||||
100: {Name: "test", CollectionID: 100, DBID: util.DefaultDBID},
|
||||
},
|
||||
}
|
||||
err := meta.ChangeCollectionState(context.TODO(), 100, pb.CollectionState_CollectionCreated, 1000)
|
||||
|
|
|
@ -894,6 +894,7 @@ func (c *Core) DropDatabase(ctx context.Context, in *milvuspb.DropDatabaseReques
|
|||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
metrics.RootCoordNumOfDatabases.Dec()
|
||||
metrics.CleanupRootCoordDBMetrics(in.GetDbName())
|
||||
log.Ctx(ctx).Info("done to drop database", zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()),
|
||||
zap.Uint64("ts", t.GetTs()))
|
||||
|
|
|
@ -82,6 +82,8 @@ var (
|
|||
Help: "stored l0 segment rate",
|
||||
}, []string{})
|
||||
|
||||
// DataCoordNumStoredRows all metrics will be cleaned up after removing matched collectionID and
|
||||
// segment state labels in CleanupDataCoordNumStoredRows method.
|
||||
DataCoordNumStoredRows = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
|
@ -89,6 +91,7 @@ var (
|
|||
Name: "stored_rows_num",
|
||||
Help: "number of stored rows of healthy segment",
|
||||
}, []string{
|
||||
databaseLabelName,
|
||||
collectionIDLabelName,
|
||||
segmentStateLabelName,
|
||||
})
|
||||
|
@ -132,6 +135,7 @@ var (
|
|||
Name: "stored_binlog_size",
|
||||
Help: "binlog size of healthy segments",
|
||||
}, []string{
|
||||
databaseLabelName,
|
||||
collectionIDLabelName,
|
||||
segmentIDLabelName,
|
||||
})
|
||||
|
@ -318,7 +322,7 @@ func CleanupDataCoordSegmentMetrics(collectionID int64, segmentID int64) {
|
|||
|
||||
func CleanupDataCoordNumStoredRows(collectionID int64) {
|
||||
for _, state := range commonpb.SegmentState_name {
|
||||
DataCoordNumStoredRows.Delete(prometheus.Labels{
|
||||
DataCoordNumStoredRows.DeletePartialMatch(prometheus.Labels{
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
segmentStateLabelName: fmt.Sprint(state),
|
||||
})
|
||||
|
|
|
@ -81,6 +81,8 @@ const (
|
|||
functionLabelName = "function_name"
|
||||
queryTypeLabelName = "query_type"
|
||||
collectionName = "collection_name"
|
||||
databaseLabelName = "db_name"
|
||||
resourceGroupLabelName = "rg"
|
||||
indexName = "index_name"
|
||||
isVectorIndex = "is_vector_index"
|
||||
segmentStateLabelName = "segment_state"
|
||||
|
|
|
@ -69,3 +69,69 @@ func TestRegisterRuntimeInfo(t *testing.T) {
|
|||
assert.Equal(t, "etcd", metaType)
|
||||
assert.Equal(t, "pulsar", mqType)
|
||||
}
|
||||
|
||||
// TestDeletePartialMatch test deletes all metrics where the variable labels contain all of those
|
||||
// passed in as labels based on DeletePartialMatch API
|
||||
func TestDeletePartialMatch(t *testing.T) {
|
||||
baseVec := prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "test",
|
||||
Help: "helpless",
|
||||
},
|
||||
[]string{"l1", "l2", "l3"},
|
||||
)
|
||||
|
||||
baseVec.WithLabelValues("l1-1", "l2-1", "l3-1").Inc()
|
||||
baseVec.WithLabelValues("l1-2", "l2-2", "l3-2").Inc()
|
||||
baseVec.WithLabelValues("l1-2", "l2-3", "l3-3").Inc()
|
||||
|
||||
baseVec.WithLabelValues("l1-3", "l2-3", "l3-3").Inc()
|
||||
baseVec.WithLabelValues("l1-3", "l2-3", "").Inc()
|
||||
baseVec.WithLabelValues("l1-3", "l2-4", "l3-4").Inc()
|
||||
|
||||
baseVec.WithLabelValues("l1-4", "l2-5", "l3-5").Inc()
|
||||
baseVec.WithLabelValues("l1-4", "l2-5", "l3-6").Inc()
|
||||
baseVec.WithLabelValues("l1-5", "l2-6", "l3-6").Inc()
|
||||
|
||||
getMetricsCount := func() int {
|
||||
chs := make(chan prometheus.Metric, 10)
|
||||
baseVec.Collect(chs)
|
||||
return len(chs)
|
||||
}
|
||||
|
||||
// the prefix is matched which has one labels
|
||||
if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l1": "l1-2"}), 2; got != want {
|
||||
t.Errorf("got %v, want %v", got, want)
|
||||
}
|
||||
assert.Equal(t, 7, getMetricsCount())
|
||||
|
||||
// the prefix is matched which has two labels
|
||||
if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l1": "l1-3", "l2": "l2-3"}), 2; got != want {
|
||||
t.Errorf("got %v, want %v", got, want)
|
||||
}
|
||||
assert.Equal(t, 5, getMetricsCount())
|
||||
|
||||
// the first and latest labels are matched
|
||||
if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l1": "l1-1", "l3": "l3-1"}), 1; got != want {
|
||||
t.Errorf("got %v, want %v", got, want)
|
||||
}
|
||||
assert.Equal(t, 4, getMetricsCount())
|
||||
|
||||
// the middle labels are matched
|
||||
if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l2": "l2-5"}), 2; got != want {
|
||||
t.Errorf("got %v, want %v", got, want)
|
||||
}
|
||||
assert.Equal(t, 2, getMetricsCount())
|
||||
|
||||
// the middle labels and suffix labels are matched
|
||||
if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l2": "l2-6", "l3": "l3-6"}), 1; got != want {
|
||||
t.Errorf("got %v, want %v", got, want)
|
||||
}
|
||||
assert.Equal(t, 1, getMetricsCount())
|
||||
|
||||
// all labels are matched
|
||||
if got, want := baseVec.DeletePartialMatch(prometheus.Labels{"l1": "l1-3", "l2": "l2-4", "l3": "l3-4"}), 1; got != want {
|
||||
t.Errorf("got %v, want %v", got, want)
|
||||
}
|
||||
assert.Equal(t, 0, getMetricsCount())
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ var (
|
|||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "search_vectors_count",
|
||||
Help: "counter of vectors successfully searched",
|
||||
}, []string{nodeIDLabelName})
|
||||
}, []string{nodeIDLabelName, databaseLabelName, collectionName})
|
||||
|
||||
// ProxyInsertVectors record the number of vectors insert successfully.
|
||||
ProxyInsertVectors = prometheus.NewCounterVec(
|
||||
|
@ -49,7 +49,7 @@ var (
|
|||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "insert_vectors_count",
|
||||
Help: "counter of vectors successfully inserted",
|
||||
}, []string{nodeIDLabelName})
|
||||
}, []string{nodeIDLabelName, databaseLabelName, collectionName})
|
||||
|
||||
// ProxyUpsertVectors record the number of vectors upsert successfully.
|
||||
ProxyUpsertVectors = prometheus.NewCounterVec(
|
||||
|
@ -58,7 +58,7 @@ var (
|
|||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "upsert_vectors_count",
|
||||
Help: "counter of vectors successfully upserted",
|
||||
}, []string{nodeIDLabelName})
|
||||
}, []string{nodeIDLabelName, databaseLabelName, collectionName})
|
||||
|
||||
ProxyDeleteVectors = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
|
@ -76,9 +76,10 @@ var (
|
|||
Name: "sq_latency",
|
||||
Help: "latency of search or query successfully",
|
||||
Buckets: buckets,
|
||||
}, []string{nodeIDLabelName, queryTypeLabelName})
|
||||
}, []string{nodeIDLabelName, queryTypeLabelName, databaseLabelName, collectionName})
|
||||
|
||||
// ProxyCollectionSQLatency record the latency of search successfully, per collection
|
||||
// Deprecated, ProxySQLatency instead of it
|
||||
ProxyCollectionSQLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
|
@ -96,9 +97,10 @@ var (
|
|||
Name: "mutation_latency",
|
||||
Help: "latency of insert or delete successfully",
|
||||
Buckets: buckets, // unit: ms
|
||||
}, []string{nodeIDLabelName, msgTypeLabelName})
|
||||
}, []string{nodeIDLabelName, msgTypeLabelName, databaseLabelName, collectionName})
|
||||
|
||||
// ProxyMutationLatency record the latency that mutate successfully, per collection
|
||||
// ProxyCollectionMutationLatency record the latency that mutate successfully, per collection
|
||||
// Deprecated, ProxyMutationLatency instead of it
|
||||
ProxyCollectionMutationLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
|
@ -107,6 +109,7 @@ var (
|
|||
Help: "latency of insert or delete successfully, per collection",
|
||||
Buckets: buckets,
|
||||
}, []string{nodeIDLabelName, msgTypeLabelName, collectionName})
|
||||
|
||||
// ProxyWaitForSearchResultLatency record the time that the proxy waits for the search result.
|
||||
ProxyWaitForSearchResultLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
|
@ -230,7 +233,7 @@ var (
|
|||
Subsystem: typeutil.ProxyRole,
|
||||
Name: "req_count",
|
||||
Help: "count of operation executed",
|
||||
}, []string{nodeIDLabelName, functionLabelName, statusLabelName})
|
||||
}, []string{nodeIDLabelName, functionLabelName, statusLabelName, databaseLabelName, collectionName})
|
||||
|
||||
// ProxyReqLatency records the latency that for all requests, like "CreateCollection".
|
||||
ProxyReqLatency = prometheus.NewHistogramVec(
|
||||
|
@ -372,7 +375,59 @@ func RegisterProxy(registry *prometheus.Registry) {
|
|||
registry.MustRegister(ProxySlowQueryCount)
|
||||
}
|
||||
|
||||
func CleanupCollectionMetrics(nodeID int64, collection string) {
|
||||
func CleanupProxyDBMetrics(nodeID int64, dbName string) {
|
||||
ProxySearchVectors.DeletePartialMatch(prometheus.Labels{
|
||||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
databaseLabelName: dbName,
|
||||
})
|
||||
ProxyInsertVectors.DeletePartialMatch(prometheus.Labels{
|
||||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
databaseLabelName: dbName,
|
||||
})
|
||||
ProxyUpsertVectors.DeletePartialMatch(prometheus.Labels{
|
||||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
databaseLabelName: dbName,
|
||||
})
|
||||
ProxySQLatency.DeletePartialMatch(prometheus.Labels{
|
||||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
databaseLabelName: dbName,
|
||||
})
|
||||
ProxyMutationLatency.DeletePartialMatch(prometheus.Labels{
|
||||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
databaseLabelName: dbName,
|
||||
})
|
||||
ProxyFunctionCall.DeletePartialMatch(prometheus.Labels{
|
||||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
databaseLabelName: dbName,
|
||||
})
|
||||
}
|
||||
|
||||
func CleanupProxyCollectionMetrics(nodeID int64, collection string) {
|
||||
ProxySearchVectors.DeletePartialMatch(prometheus.Labels{
|
||||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
collectionName: collection,
|
||||
})
|
||||
ProxyInsertVectors.DeletePartialMatch(prometheus.Labels{
|
||||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
collectionName: collection,
|
||||
})
|
||||
ProxyUpsertVectors.DeletePartialMatch(prometheus.Labels{
|
||||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
collectionName: collection,
|
||||
})
|
||||
ProxySQLatency.DeletePartialMatch(prometheus.Labels{
|
||||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
collectionName: collection,
|
||||
})
|
||||
ProxyMutationLatency.DeletePartialMatch(prometheus.Labels{
|
||||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
collectionName: collection,
|
||||
})
|
||||
ProxyFunctionCall.DeletePartialMatch(prometheus.Labels{
|
||||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
collectionName: collection,
|
||||
})
|
||||
|
||||
ProxyCollectionSQLatency.Delete(prometheus.Labels{
|
||||
nodeIDLabelName: strconv.FormatInt(nodeID, 10),
|
||||
queryTypeLabelName: SearchLabel, collectionName: collection,
|
||||
|
|
|
@ -362,6 +362,7 @@ var (
|
|||
Name: "entity_num",
|
||||
Help: "number of entities which can be searched/queried, clustered by collection, partition and state",
|
||||
}, []string{
|
||||
databaseLabelName,
|
||||
nodeIDLabelName,
|
||||
collectionIDLabelName,
|
||||
partitionIDLabelName,
|
||||
|
@ -567,5 +568,11 @@ func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) {
|
|||
msgTypeLabelName: label,
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
QueryNodeNumEntities.
|
||||
DeletePartialMatch(
|
||||
prometheus.Labels{
|
||||
nodeIDLabelName: fmt.Sprint(nodeID),
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -93,13 +93,13 @@ var (
|
|||
})
|
||||
|
||||
// RootCoordNumOfCollections counts the number of collections.
|
||||
RootCoordNumOfCollections = prometheus.NewGauge(
|
||||
RootCoordNumOfCollections = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.RootCoordRole,
|
||||
Name: "collection_num",
|
||||
Help: "number of collections",
|
||||
})
|
||||
}, []string{databaseLabelName})
|
||||
|
||||
// RootCoordNumOfPartitions counts the number of partitions per collection.
|
||||
RootCoordNumOfPartitions = prometheus.NewGaugeVec(
|
||||
|
@ -247,3 +247,9 @@ func RegisterRootCoord(registry *prometheus.Registry) {
|
|||
registry.MustRegister(RootCoordNumEntities)
|
||||
registry.MustRegister(RootCoordIndexedNumEntities)
|
||||
}
|
||||
|
||||
func CleanupRootCoordDBMetrics(dbName string) {
|
||||
RootCoordNumOfCollections.Delete(prometheus.Labels{
|
||||
databaseLabelName: dbName,
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue