Refine segment stat metrics on QueryNode (#20763)

issue: #20760

/kind improvement

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
pull/20860/head
Ten Thousand Leaves 2022-11-28 14:53:14 +08:00 committed by GitHub
parent dd7e309a29
commit 4f9a294b2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 187 additions and 49 deletions

View File

@ -63,6 +63,7 @@ const (
indexTaskStatusLabelName = "index_task_status"
msgTypeLabelName = "msg_type"
collectionIDLabelName = "collection_id"
partitionIDLabelName = "partition_id"
channelNameLabelName = "channel_name"
functionLabelName = "function_name"
queryTypeLabelName = "query_type"
@ -72,6 +73,7 @@ const (
roleNameLabelName = "role_name"
cacheNameLabelName = "cache_name"
cacheStateLabelName = "cache_state"
indexCountLabelName = "indexed_field_count"
requestScope = "scope"
)

View File

@ -73,9 +73,13 @@ var (
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "segment_num",
Help: "number of segments loaded",
Help: "number of segments loaded, clustered by its collection, partition, state and # of indexed fields",
}, []string{
nodeIDLabelName,
collectionIDLabelName,
partitionIDLabelName,
segmentStateLabelName,
indexCountLabelName,
})
QueryNodeNumDmlChannels = prometheus.NewGaugeVec(
@ -313,9 +317,12 @@ var (
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "entity_num",
Help: "number of entities which can be searched/queried",
Help: "number of entities which can be searched/queried, clustered by collection, partition and state",
}, []string{
nodeIDLabelName,
collectionIDLabelName,
partitionIDLabelName,
segmentStateLabelName,
})
// QueryNodeConsumeCounter counts the bytes QueryNode consumed from message storage.
@ -337,7 +344,7 @@ var (
}, []string{nodeIDLabelName, msgTypeLabelName})
)
//RegisterQueryNode registers QueryNode metrics
// RegisterQueryNode registers QueryNode metrics
func RegisterQueryNode(registry *prometheus.Registry) {
registry.MustRegister(QueryNodeNumCollections)
registry.MustRegister(QueryNodeNumPartitions)

View File

@ -581,40 +581,61 @@ func (replica *metaReplica) addSegment(segmentID UniqueID, partitionID UniqueID,
if err != nil {
return err
}
return replica.addSegmentPrivate(segmentID, partitionID, seg)
return replica.addSegmentPrivate(seg)
}
// addSegmentPrivate is private function in collectionReplica, to add a new segment to collectionReplica
func (replica *metaReplica) addSegmentPrivate(segmentID UniqueID, partitionID UniqueID, segment *Segment) error {
partition, err := replica.getPartitionByIDPrivate(partitionID)
func (replica *metaReplica) addSegmentPrivate(segment *Segment) error {
segID := segment.segmentID
partition, err := replica.getPartitionByIDPrivate(segment.partitionID)
if err != nil {
return err
}
segType := segment.getType()
ok, err := replica.hasSegmentPrivate(segmentID, segType)
ok, err := replica.hasSegmentPrivate(segID, segType)
if err != nil {
return err
}
if ok {
return fmt.Errorf("segment has been existed, "+
"segmentID = %d, collectionID = %d, segmentType = %s", segmentID, segment.collectionID, segType.String())
"segmentID = %d, collectionID = %d, segmentType = %s", segID, segment.collectionID, segType.String())
}
partition.addSegmentID(segmentID, segType)
partition.addSegmentID(segID, segType)
switch segType {
case segmentTypeGrowing:
replica.growingSegments[segmentID] = segment
replica.growingSegments[segID] = segment
case segmentTypeSealed:
replica.sealedSegments[segmentID] = segment
replica.sealedSegments[segID] = segment
default:
return fmt.Errorf("unexpected segment type, segmentID = %d, segmentType = %s", segmentID, segType.String())
return fmt.Errorf("unexpected segment type, segmentID = %d, segmentType = %s", segID, segType.String())
}
metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
rowCount := segment.getRowCount()
log.Info("new segment added to collection replica",
zap.Int64("query node ID", paramtable.GetNodeID()),
zap.Int64("collection ID", segment.collectionID),
zap.Int64("partition ID", segment.partitionID),
zap.Int64("segment ID", segID),
zap.String("segment type", segType.String()),
zap.Int64("row count", rowCount),
zap.Uint64("segment indexed fields", segment.indexedFieldInfos.Len()),
)
metrics.QueryNodeNumSegments.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(segment.collectionID),
fmt.Sprint(segment.partitionID),
string(segType),
fmt.Sprint(segment.indexedFieldInfos.Len()),
).Inc()
if rowCount > 0 {
metrics.QueryNodeNumEntities.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Add(float64(rowCount))
metrics.QueryNodeNumEntities.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(segment.collectionID),
fmt.Sprint(segment.partitionID),
string(segType),
).Add(float64(rowCount))
}
return nil
}
@ -633,7 +654,7 @@ func (replica *metaReplica) setSegment(segment *Segment) error {
return err
}
return replica.addSegmentPrivate(segment.segmentID, segment.partitionID, segment)
return replica.addSegmentPrivate(segment)
}
// removeSegment removes a segment from collectionReplica
@ -665,9 +686,12 @@ func (replica *metaReplica) removeSegment(segmentID UniqueID, segType segmentTyp
// removeSegmentPrivate is private function in collectionReplica, to remove a segment from collectionReplica
func (replica *metaReplica) removeSegmentPrivate(segmentID UniqueID, segType segmentType) {
var rowCount int64
var segment *Segment
switch segType {
case segmentTypeGrowing:
if segment, ok := replica.growingSegments[segmentID]; ok {
var ok bool
if segment, ok = replica.growingSegments[segmentID]; ok {
if partition, ok := replica.partitions[segment.partitionID]; ok {
partition.removeSegmentID(segmentID, segType)
}
@ -676,11 +700,11 @@ func (replica *metaReplica) removeSegmentPrivate(segmentID UniqueID, segType seg
deleteSegment(segment)
}
case segmentTypeSealed:
if segment, ok := replica.sealedSegments[segmentID]; ok {
var ok bool
if segment, ok = replica.sealedSegments[segmentID]; ok {
if partition, ok := replica.partitions[segment.partitionID]; ok {
partition.removeSegmentID(segmentID, segType)
}
rowCount = segment.getRowCount()
delete(replica.sealedSegments, segmentID)
deleteSegment(segment)
@ -689,9 +713,38 @@ func (replica *metaReplica) removeSegmentPrivate(segmentID UniqueID, segType seg
panic(fmt.Sprintf("unsupported segment type %s", segType.String()))
}
metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
if rowCount > 0 {
metrics.QueryNodeNumEntities.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Sub(float64(rowCount))
if segment == nil {
// If not found.
log.Info("segment NOT removed from collection replica: segment not exist",
zap.Int64("segment ID", segmentID),
zap.String("segment type", segType.String()),
)
} else {
log.Info("segment removed from collection replica",
zap.Int64("QueryNode ID", paramtable.GetNodeID()),
zap.Int64("collection ID", segment.collectionID),
zap.Int64("partition ID", segment.partitionID),
zap.Int64("segment ID", segmentID),
zap.String("segment type", segType.String()),
zap.Int64("row count", rowCount),
zap.Uint64("segment indexed fields", segment.indexedFieldInfos.Len()),
)
metrics.QueryNodeNumSegments.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(segment.collectionID),
fmt.Sprint(segment.partitionID),
string(segType),
// Note: this field is mutable after segment is loaded.
fmt.Sprint(segment.indexedFieldInfos.Len()),
).Dec()
if rowCount > 0 {
metrics.QueryNodeNumEntities.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(segment.collectionID),
fmt.Sprint(segment.partitionID),
string(segType),
).Sub(float64(rowCount))
}
}
}

View File

@ -136,7 +136,7 @@ func (s *Segment) getType() segmentType {
}
func (s *Segment) setIndexedFieldInfo(fieldID UniqueID, info *IndexedFieldInfo) {
s.indexedFieldInfos.Insert(fieldID, info)
s.indexedFieldInfos.InsertIfNotPresent(fieldID, info)
}
func (s *Segment) getIndexedFieldInfo(fieldID UniqueID) (*IndexedFieldInfo, error) {
@ -670,7 +670,7 @@ func (s *Segment) isPKExist(pk primaryKey) bool {
return false
}
//-------------------------------------------------------------------------------------- interfaces for growing segment
// -------------------------------------------------------------------------------------- interfaces for growing segment
func (s *Segment) segmentPreInsert(numOfRecords int) (int64, error) {
/*
long int
@ -759,7 +759,12 @@ func (s *Segment) segmentInsert(offset int64, entityIDs []UniqueID, timestamps [
if err := HandleCStatus(&status, "Insert failed"); err != nil {
return err
}
metrics.QueryNodeNumEntities.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Add(float64(numOfRow))
metrics.QueryNodeNumEntities.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(s.collectionID),
fmt.Sprint(s.partitionID),
s.segmentType.String(),
).Add(float64(numOfRow))
s.setRecentlyModified(true)
return nil
}
@ -837,7 +842,7 @@ func (s *Segment) segmentDelete(offset int64, entityIDs []primaryKey, timestamps
return nil
}
//-------------------------------------------------------------------------------------- interfaces for sealed segment
// -------------------------------------------------------------------------------------- interfaces for sealed segment
func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int64, data *schemapb.FieldData) error {
/*
CStatus

View File

@ -1,6 +1,10 @@
package typeutil
import "sync"
import (
"sync"
"go.uber.org/atomic"
)
// MapEqual returns true if the two map contain the same keys and values
func MapEqual(left, right map[int64]int64) bool {
@ -35,14 +39,23 @@ func GetMapKeys(src map[string]string) []string {
type ConcurrentMap[K comparable, V any] struct {
inner sync.Map
// Self-managed Len(), see: https://github.com/golang/go/issues/20680.
len atomic.Uint64
}
func NewConcurrentMap[K comparable, V any]() *ConcurrentMap[K, V] {
return &ConcurrentMap[K, V]{}
}
func (m *ConcurrentMap[K, V]) Insert(key K, value V) {
m.inner.Store(key, value)
func (m *ConcurrentMap[K, V]) Len() uint64 {
return m.len.Load()
}
// InsertIfNotPresent inserts the key-value pair to the concurrent map if the key does not exist. It is otherwise a no-op.
func (m *ConcurrentMap[K, V]) InsertIfNotPresent(key K, value V) {
if _, loaded := m.inner.LoadOrStore(key, value); !loaded {
m.len.Inc()
}
}
func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
@ -54,33 +67,24 @@ func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
return value.(V), true
}
func (m *ConcurrentMap[K, V]) get(key K) (V, bool) {
var zeroValue V
value, ok := m.inner.Load(key)
if !ok {
return zeroValue, ok
}
return value.(V), true
}
// GetOrInsert returns the `value` and `loaded` on the given `key`, `value` set.
// If the key already exists, return the value and set `loaded` to true.
// If the key does not exist, insert the given `key` and `value` to map, return the value and set `loaded` to false.
func (m *ConcurrentMap[K, V]) GetOrInsert(key K, value V) (V, bool) {
var zeroValue V
loaded, exist := m.inner.LoadOrStore(key, value)
if !exist {
return zeroValue, exist
stored, loaded := m.inner.LoadOrStore(key, value)
if !loaded {
m.len.Inc()
return stored.(V), false
}
return loaded.(V), true
return stored.(V), true
}
func (m *ConcurrentMap[K, V]) GetAndRemove(key K) (V, bool) {
var zeroValue V
value, ok := m.inner.LoadAndDelete(key)
if !ok {
return zeroValue, ok
value, loaded := m.inner.LoadAndDelete(key)
if !loaded {
return zeroValue, false
}
m.len.Dec()
return value.(V), true
}
func (m *ConcurrentMap[K, V]) Remove(key K) {
m.inner.Delete(key)
}

View File

@ -3,6 +3,7 @@ package typeutil
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
@ -54,6 +55,72 @@ func (suite *MapUtilSuite) TestGetMapKeys() {
suite.Contains(keys, "Alice", "Bob")
}
func (suite *MapUtilSuite) TestConcurrentMap() {
currMap := NewConcurrentMap[int64, string]()
assert.EqualValues(suite.T(), 0, currMap.Len())
v, loaded := currMap.GetOrInsert(100, "v-100")
assert.Equal(suite.T(), "v-100", v)
assert.Equal(suite.T(), false, loaded)
v, loaded = currMap.GetOrInsert(100, "v-100")
assert.Equal(suite.T(), "v-100", v)
assert.Equal(suite.T(), true, loaded)
v, loaded = currMap.GetOrInsert(100, "v-100")
assert.Equal(suite.T(), "v-100", v)
assert.Equal(suite.T(), true, loaded)
assert.Equal(suite.T(), uint64(1), currMap.Len())
assert.EqualValues(suite.T(), 1, currMap.Len())
currMap.InsertIfNotPresent(100, "v-100-new")
currMap.InsertIfNotPresent(200, "v-200")
currMap.InsertIfNotPresent(300, "v-300")
assert.Equal(suite.T(), uint64(3), currMap.Len())
assert.EqualValues(suite.T(), 3, currMap.Len())
var exist bool
v, exist = currMap.Get(100)
assert.Equal(suite.T(), "v-100", v)
assert.Equal(suite.T(), true, exist)
v, exist = currMap.Get(200)
assert.Equal(suite.T(), "v-200", v)
assert.Equal(suite.T(), true, exist)
v, exist = currMap.Get(300)
assert.Equal(suite.T(), "v-300", v)
assert.Equal(suite.T(), true, exist)
assert.EqualValues(suite.T(), 3, currMap.Len())
v, exist = currMap.GetOrInsert(100, "new-v")
assert.Equal(suite.T(), "v-100", v)
assert.Equal(suite.T(), true, exist)
v, exist = currMap.GetOrInsert(200, "new-v")
assert.Equal(suite.T(), "v-200", v)
assert.Equal(suite.T(), true, exist)
v, exist = currMap.GetOrInsert(300, "new-v")
assert.Equal(suite.T(), "v-300", v)
assert.Equal(suite.T(), true, exist)
v, exist = currMap.GetOrInsert(400, "new-v")
assert.Equal(suite.T(), "new-v", v)
assert.Equal(suite.T(), false, exist)
assert.EqualValues(suite.T(), 4, currMap.Len())
v, loaded = currMap.GetAndRemove(100)
assert.Equal(suite.T(), "v-100", v)
assert.Equal(suite.T(), true, loaded)
v, loaded = currMap.GetAndRemove(200)
assert.Equal(suite.T(), "v-200", v)
assert.Equal(suite.T(), true, loaded)
v, loaded = currMap.GetAndRemove(300)
assert.Equal(suite.T(), "v-300", v)
assert.Equal(suite.T(), true, loaded)
v, loaded = currMap.GetAndRemove(400)
assert.Equal(suite.T(), "new-v", v)
assert.Equal(suite.T(), true, loaded)
v, loaded = currMap.GetAndRemove(500)
assert.Equal(suite.T(), "", v)
assert.Equal(suite.T(), false, loaded)
assert.EqualValues(suite.T(), 0, currMap.Len())
}
func TestMapUtil(t *testing.T) {
suite.Run(t, new(MapUtilSuite))
}