enhance: Add metrics for querynode delete buffer info (#37081)

Related to #35303

This PR add metrics for querynode delegator delete buffer information,
which is related to dml quota logic.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/37124/head
congqixia 2024-10-24 10:47:28 +08:00 committed by GitHub
parent f43527ef6f
commit d8db3e8761
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 69 additions and 21 deletions

View File

@ -865,7 +865,6 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6}, vchan.FlushedSegmentIds)
assert.ElementsMatch(t, []int64{1, 2}, vchan.DroppedSegmentIds)
})
}
func TestShouldDropChannel(t *testing.T) {

View File

@ -843,6 +843,9 @@ func (sd *shardDelegator) Close() {
// broadcast to all waitTsafe goroutine to quit
sd.tsCond.Broadcast()
sd.lifetime.Wait()
metrics.QueryNodeDeleteBufferSize.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), sd.vchannelName)
metrics.QueryNodeDeleteBufferRowNum.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), sd.vchannelName)
}
// As partition stats is an optimization for search/query which is not mandatory for milvus instance,
@ -915,16 +918,17 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
excludedSegments := NewExcludedSegments(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.GetAsDuration(time.Second))
sd := &shardDelegator{
collectionID: collectionID,
replicaID: replicaID,
vchannelName: channel,
version: version,
collection: collection,
segmentManager: manager.Segment,
workerManager: workerManager,
lifetime: lifetime.NewLifetime(lifetime.Initializing),
distribution: NewDistribution(),
deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock),
collectionID: collectionID,
replicaID: replicaID,
vchannelName: channel,
version: version,
collection: collection,
segmentManager: manager.Segment,
workerManager: workerManager,
lifetime: lifetime.NewLifetime(lifetime.Initializing),
distribution: NewDistribution(),
deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock,
[]string{fmt.Sprint(paramtable.GetNodeID()), channel}),
pkOracle: pkoracle.NewPkOracle(),
tsafeManager: tsafeManager,
latestTsafe: atomic.NewUint64(startTs),

View File

@ -20,13 +20,16 @@ import (
"sync"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/metrics"
)
func NewListDeleteBuffer[T timed](startTs uint64, sizePerBlock int64) DeleteBuffer[T] {
func NewListDeleteBuffer[T timed](startTs uint64, sizePerBlock int64, labels []string) DeleteBuffer[T] {
return &listDeleteBuffer[T]{
safeTs: startTs,
sizePerBlock: sizePerBlock,
list: []*cacheBlock[T]{newCacheBlock[T](startTs, sizePerBlock)},
labels: labels,
}
}
@ -40,6 +43,18 @@ type listDeleteBuffer[T timed] struct {
safeTs uint64
sizePerBlock int64
// cached metrics
rowNum int64
size int64
// metrics labels
labels []string
}
func (b *listDeleteBuffer[T]) updateMetrics() {
metrics.QueryNodeDeleteBufferRowNum.WithLabelValues(b.labels...).Set(float64(b.rowNum))
metrics.QueryNodeDeleteBufferSize.WithLabelValues(b.labels...).Set(float64(b.size))
}
func (b *listDeleteBuffer[T]) Put(entry T) {
@ -51,6 +66,11 @@ func (b *listDeleteBuffer[T]) Put(entry T) {
if errors.Is(err, errBufferFull) {
b.list = append(b.list, newCacheBlock[T](entry.Timestamp(), b.sizePerBlock, entry))
}
// update metrics
b.rowNum += entry.EntryNum()
b.size += entry.Size()
b.updateMetrics()
}
func (b *listDeleteBuffer[T]) ListAfter(ts uint64) []T {
@ -87,9 +107,13 @@ func (b *listDeleteBuffer[T]) TryDiscard(ts uint64) {
if nextHead > 0 {
for idx := 0; idx < nextHead; idx++ {
rowNum, memSize := b.list[idx].Size()
b.rowNum -= rowNum
b.size -= memSize
b.list[idx] = nil
}
b.list = b.list[nextHead:]
b.updateMetrics()
}
}
@ -97,10 +121,5 @@ func (b *listDeleteBuffer[T]) Size() (entryNum, memorySize int64) {
b.mut.RLock()
defer b.mut.RUnlock()
for _, block := range b.list {
blockNum, blockSize := block.Size()
entryNum += blockNum
memorySize += blockSize
}
return entryNum, memorySize
return b.rowNum, b.size
}

View File

@ -29,7 +29,7 @@ type ListDeleteBufferSuite struct {
}
func (s *ListDeleteBufferSuite) TestNewBuffer() {
buffer := NewListDeleteBuffer[*Item](10, 1000)
buffer := NewListDeleteBuffer[*Item](10, 1000, []string{"1", "dml-1"})
s.EqualValues(10, buffer.SafeTs())
@ -39,7 +39,7 @@ func (s *ListDeleteBufferSuite) TestNewBuffer() {
}
func (s *ListDeleteBufferSuite) TestCache() {
buffer := NewListDeleteBuffer[*Item](10, 1000)
buffer := NewListDeleteBuffer[*Item](10, 1000, []string{"1", "dml-1"})
buffer.Put(&Item{
Ts: 11,
Data: []BufferItem{
@ -68,7 +68,7 @@ func (s *ListDeleteBufferSuite) TestCache() {
}
func (s *ListDeleteBufferSuite) TestTryDiscard() {
buffer := NewListDeleteBuffer[*Item](10, 1)
buffer := NewListDeleteBuffer[*Item](10, 1, []string{"1", "dml-1"})
buffer.Put(&Item{
Ts: 10,
Data: []BufferItem{

View File

@ -765,6 +765,30 @@ var (
}, []string{
nodeIDLabelName,
})
QueryNodeDeleteBufferSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "delete_buffer_size",
Help: "delegator delete buffer size (in bytes)",
}, []string{
nodeIDLabelName,
channelNameLabelName,
},
)
QueryNodeDeleteBufferRowNum = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "delete_buffer_row_num",
Help: "delegator delete buffer row num",
}, []string{
nodeIDLabelName,
channelNameLabelName,
},
)
)
// RegisterQueryNode registers QueryNode metrics
@ -832,6 +856,8 @@ func RegisterQueryNode(registry *prometheus.Registry) {
registry.MustRegister(QueryNodeApplyBFCost)
registry.MustRegister(QueryNodeForwardDeleteCost)
registry.MustRegister(QueryNodeSearchHitSegmentNum)
registry.MustRegister(QueryNodeDeleteBufferSize)
registry.MustRegister(QueryNodeDeleteBufferRowNum)
// Add cgo metrics
RegisterCGOMetrics(registry)