enhance: add more metrics for cache and search (#31777)

issue: #30931

---------

Signed-off-by: chyezh <chyezh@outlook.com>
pull/31062/head
chyezh 2024-04-10 10:55:17 +08:00 committed by GitHub
parent 9d16aa0bd3
commit c9faa6d936
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 925 additions and 42 deletions

View File

@ -35,6 +35,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/segments/metricsutil"
"github.com/milvus-io/milvus/pkg/eventlog"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
@ -185,12 +186,18 @@ func NewManager() *Manager {
}
info := segment.LoadInfo()
_, err, _ := sf.Do(fmt.Sprint(segment.ID()), func() (interface{}, error) {
_, err, _ := sf.Do(fmt.Sprint(segment.ID()), func() (nop interface{}, err error) {
cacheLoadRecord := metricsutil.NewCacheLoadRecord(getSegmentMetricLabel(segment))
cacheLoadRecord.WithBytes(segment.ResourceUsageEstimate().DiskSize)
defer func() {
cacheLoadRecord.Finish(err)
}()
collection := manager.Collection.Get(segment.Collection())
if collection == nil {
return nil, merr.WrapErrCollectionNotLoaded(segment.Collection(), "failed to load segment fields")
}
err := manager.Loader.LoadSegment(context.Background(), segment.(*LocalSegment), info, LoadStatusMapped)
err = manager.Loader.LoadSegment(context.Background(), segment.(*LocalSegment), info, LoadStatusMapped)
return nil, err
})
if err != nil {
@ -200,6 +207,10 @@ func NewManager() *Manager {
return segment, true
}).WithFinalizer(func(key int64, segment Segment) error {
log.Debug("evict segment from cache", zap.Int64("segmentID", key))
cacheEvictRecord := metricsutil.NewCacheEvictRecord(getSegmentMetricLabel(segment))
cacheEvictRecord.WithBytes(segment.ResourceUsageEstimate().DiskSize)
defer cacheEvictRecord.Finish(nil)
segment.Release(WithReleaseScope(ReleaseScopeData))
return nil
}).Build()

View File

@ -0,0 +1,271 @@
package metricsutil
import (
"strconv"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// labeledRecord is a labeled sample point.
type labeledRecord interface {
// Label of the access metric.
Label() SegmentLabel
// Finish finishes the record.
Finish(err error)
// getError returns the error of the record.
// current metric system simply reject the error operation.
getError() error
}
// globalObserver is the global resource groups observer.
var (
once sync.Once
globalObserver *segmentsObserver
)
func getGlobalObserver() *segmentsObserver {
once.Do(func() {
globalObserver = newSegmentsObserver()
go func() {
d := 15 * time.Minute
ticker := time.NewTicker(d)
defer ticker.Stop()
for range ticker.C {
expireAt := time.Now().Add(-d)
globalObserver.Expire(expireAt)
}
}()
})
return globalObserver
}
// newSegmentsObserver creates a new segmentsObserver.
// Used to check if a segment is hot or cold.
func newSegmentsObserver() *segmentsObserver {
return &segmentsObserver{
nodeID: strconv.FormatInt(paramtable.GetNodeID(), 10),
segments: typeutil.NewConcurrentMap[SegmentLabel, *segmentObserver](),
}
}
// segmentsObserver is a observer all segments metrics.
type segmentsObserver struct {
nodeID string
segments *typeutil.ConcurrentMap[SegmentLabel, *segmentObserver] // map segment id to observer.
// one segment can be removed from one query node, for balancing or compacting.
// no more search operation will be performed on the segment after it is removed.
// all related metric should be expired after a while.
// may be a huge map with 100000+ entries.
}
// Observe records a new metric
func (o *segmentsObserver) Observe(m labeledRecord) {
if m.getError() != nil {
return // reject error record.
// TODO: add error as a label of metrics.
}
// fast path.
label := m.Label()
observer, ok := o.segments.Get(label)
if !ok {
// slow path.
newObserver := newSegmentObserver(o.nodeID, label)
observer, _ = o.segments.GetOrInsert(label, newObserver)
}
// do a observer.
observer.Observe(m)
}
// Expire expires the observer.
func (o *segmentsObserver) Expire(expiredAt time.Time) {
o.segments.Range(func(label SegmentLabel, value *segmentObserver) bool {
if value.IsExpired(expiredAt) {
o.segments.Remove(label)
value.Clear()
return true
}
return true
})
}
// newSegmentObserver creates a new segmentObserver.
func newSegmentObserver(nodeID string, label SegmentLabel) *segmentObserver {
now := time.Now()
return &segmentObserver{
label: label,
prom: newPromObserver(nodeID, label),
lastUpdates: atomic.NewPointer[time.Time](&now),
}
}
// segmentObserver is a observer for segment metrics.
type segmentObserver struct {
label SegmentLabel // never updates
// observers.
prom promMetricsObserver // prometheus metrics observer.
// for expiration.
lastUpdates *atomic.Pointer[time.Time] // update every access.
}
// IsExpired checks if the segment observer is expired.
func (o *segmentObserver) IsExpired(expireAt time.Time) bool {
return o.lastUpdates.Load().Before(expireAt)
}
// Observe observe a new
func (o *segmentObserver) Observe(m labeledRecord) {
now := time.Now()
o.lastUpdates.Store(&now)
switch mm := m.(type) {
case *CacheLoadRecord:
o.prom.ObserveCacheLoad(mm)
case *CacheEvictRecord:
o.prom.ObserveCacheEvict(mm)
case QuerySegmentAccessRecord:
o.prom.ObserveQueryAccess(mm)
case SearchSegmentAccessRecord:
o.prom.ObserveSearchAccess(mm)
default:
panic("unknown segment access metric")
}
}
// Clear clears the observer.
func (o *segmentObserver) Clear() {
o.prom.Clear()
}
// newPromObserver creates a new promMetrics.
func newPromObserver(nodeID string, label SegmentLabel) promMetricsObserver {
return promMetricsObserver{
nodeID: nodeID,
label: label,
DiskCacheLoadTotal: metrics.QueryNodeDiskCacheLoadTotal.WithLabelValues(nodeID, label.DatabaseName, label.ResourceGroup),
DiskCacheLoadDuration: metrics.QueryNodeDiskCacheLoadDuration.WithLabelValues(nodeID, label.DatabaseName, label.ResourceGroup),
DiskCacheLoadBytes: metrics.QueryNodeDiskCacheLoadBytes.WithLabelValues(nodeID, label.DatabaseName, label.ResourceGroup),
DiskCacheEvictTotal: metrics.QueryNodeDiskCacheEvictTotal.WithLabelValues(nodeID, label.DatabaseName, label.ResourceGroup),
DiskCacheEvictDuration: metrics.QueryNodeDiskCacheEvictDuration.WithLabelValues(nodeID, label.DatabaseName, label.ResourceGroup),
DiskCacheEvictBytes: metrics.QueryNodeDiskCacheEvictBytes.WithLabelValues(nodeID, label.DatabaseName, label.ResourceGroup),
QuerySegmentAccessTotal: metrics.QueryNodeSegmentAccessTotal.WithLabelValues(nodeID, label.DatabaseName, label.ResourceGroup, metrics.QueryLabel),
QuerySegmentAccessDuration: metrics.QueryNodeSegmentAccessDuration.WithLabelValues(nodeID, label.DatabaseName, label.ResourceGroup, metrics.QueryLabel),
QuerySegmentAccessWaitCacheTotal: metrics.QueryNodeSegmentAccessWaitCacheTotal.WithLabelValues(nodeID, label.DatabaseName, label.ResourceGroup, metrics.QueryLabel),
QuerySegmentAccessWaitCacheDuration: metrics.QueryNodeSegmentAccessWaitCacheDuration.WithLabelValues(nodeID, label.DatabaseName, label.ResourceGroup, metrics.QueryLabel),
SearchSegmentAccessTotal: metrics.QueryNodeSegmentAccessTotal.WithLabelValues(nodeID, label.DatabaseName, label.ResourceGroup, metrics.SearchLabel),
SearchSegmentAccessDuration: metrics.QueryNodeSegmentAccessDuration.WithLabelValues(nodeID, label.DatabaseName, label.ResourceGroup, metrics.SearchLabel),
SearchSegmentAccessWaitCacheTotal: metrics.QueryNodeSegmentAccessWaitCacheTotal.WithLabelValues(nodeID, label.DatabaseName, label.ResourceGroup, metrics.SearchLabel),
SearchSegmentAccessWaitCacheDuration: metrics.QueryNodeSegmentAccessWaitCacheDuration.WithLabelValues(nodeID, label.DatabaseName, label.ResourceGroup, metrics.SearchLabel),
DiskCacheLoadGlobalDuration: metrics.QueryNodeDiskCacheLoadGlobalDuration.WithLabelValues(nodeID),
DiskCacheEvictGlobalDuration: metrics.QueryNodeDiskCacheEvictGlobalDuration.WithLabelValues(nodeID),
QuerySegmentAccessGlobalDuration: metrics.QueryNodeSegmentAccessGlobalDuration.WithLabelValues(nodeID, metrics.QueryLabel),
SearchSegmentAccessGlobalDuration: metrics.QueryNodeSegmentAccessGlobalDuration.WithLabelValues(nodeID, metrics.SearchLabel),
QuerySegmentAccessWaitCacheGlobalDuration: metrics.QueryNodeSegmentAccessWaitCacheGlobalDuration.WithLabelValues(nodeID, metrics.QueryLabel),
SearchSegmentAccessWaitCacheGlobalDuration: metrics.QueryNodeSegmentAccessWaitCacheGlobalDuration.WithLabelValues(nodeID, metrics.SearchLabel),
}
}
// promMetricsObserver is a observer for prometheus metrics.
type promMetricsObserver struct {
nodeID string
label SegmentLabel // never updates
DiskCacheLoadTotal prometheus.Counter
DiskCacheLoadDuration prometheus.Counter
DiskCacheLoadBytes prometheus.Counter
DiskCacheEvictTotal prometheus.Counter
DiskCacheEvictBytes prometheus.Counter
DiskCacheEvictDuration prometheus.Counter
QuerySegmentAccessTotal prometheus.Counter
QuerySegmentAccessDuration prometheus.Counter
QuerySegmentAccessWaitCacheTotal prometheus.Counter
QuerySegmentAccessWaitCacheDuration prometheus.Counter
SearchSegmentAccessTotal prometheus.Counter
SearchSegmentAccessDuration prometheus.Counter
SearchSegmentAccessWaitCacheTotal prometheus.Counter
SearchSegmentAccessWaitCacheDuration prometheus.Counter
DiskCacheLoadGlobalDuration prometheus.Observer
DiskCacheEvictGlobalDuration prometheus.Observer
QuerySegmentAccessGlobalDuration prometheus.Observer
SearchSegmentAccessGlobalDuration prometheus.Observer
QuerySegmentAccessWaitCacheGlobalDuration prometheus.Observer
SearchSegmentAccessWaitCacheGlobalDuration prometheus.Observer
}
// ObserveLoad records a new cache load
func (o *promMetricsObserver) ObserveCacheLoad(r *CacheLoadRecord) {
o.DiskCacheLoadTotal.Inc()
o.DiskCacheLoadBytes.Add(r.getBytes())
d := r.getMilliseconds()
o.DiskCacheLoadDuration.Add(d)
o.DiskCacheLoadGlobalDuration.Observe(d)
}
// ObserveCacheEvict records a new cache evict.
func (o *promMetricsObserver) ObserveCacheEvict(r *CacheEvictRecord) {
o.DiskCacheEvictTotal.Inc()
o.DiskCacheEvictBytes.Add(r.getBytes())
d := r.getMilliseconds()
o.DiskCacheEvictDuration.Add(d)
o.DiskCacheEvictGlobalDuration.Observe(d)
}
// ObserveQueryAccess records a new query access.
func (o *promMetricsObserver) ObserveQueryAccess(r QuerySegmentAccessRecord) {
o.QuerySegmentAccessTotal.Inc()
d := r.getMilliseconds()
o.QuerySegmentAccessDuration.Add(d)
o.QuerySegmentAccessGlobalDuration.Observe(d)
if r.isCacheMiss {
o.QuerySegmentAccessWaitCacheTotal.Inc()
d := r.getWaitLoadMilliseconds()
o.QuerySegmentAccessWaitCacheDuration.Add(d)
o.QuerySegmentAccessWaitCacheGlobalDuration.Observe(d)
}
}
// ObserveSearchAccess records a new search access.
func (o *promMetricsObserver) ObserveSearchAccess(r SearchSegmentAccessRecord) {
o.SearchSegmentAccessTotal.Inc()
d := r.getMilliseconds()
o.SearchSegmentAccessDuration.Add(d)
o.SearchSegmentAccessGlobalDuration.Observe(d)
if r.isCacheMiss {
o.SearchSegmentAccessWaitCacheTotal.Inc()
d := r.getWaitLoadMilliseconds()
o.SearchSegmentAccessWaitCacheDuration.Add(d)
o.SearchSegmentAccessWaitCacheGlobalDuration.Observe(d)
}
}
// Clear clears the prometheus metrics.
func (o *promMetricsObserver) Clear() {
label := o.label
metrics.QueryNodeDiskCacheLoadTotal.DeleteLabelValues(o.nodeID, label.DatabaseName, label.ResourceGroup)
metrics.QueryNodeDiskCacheLoadBytes.DeleteLabelValues(o.nodeID, label.DatabaseName, label.ResourceGroup)
metrics.QueryNodeDiskCacheLoadDuration.DeleteLabelValues(o.nodeID, label.DatabaseName, label.ResourceGroup)
metrics.QueryNodeDiskCacheEvictTotal.DeleteLabelValues(o.nodeID, label.DatabaseName, label.ResourceGroup)
metrics.QueryNodeDiskCacheEvictBytes.DeleteLabelValues(o.nodeID, label.DatabaseName, label.ResourceGroup)
metrics.QueryNodeDiskCacheEvictDuration.DeleteLabelValues(o.nodeID, label.DatabaseName, label.ResourceGroup)
metrics.QueryNodeSegmentAccessTotal.DeleteLabelValues(o.nodeID, label.DatabaseName, label.ResourceGroup, metrics.SearchLabel)
metrics.QueryNodeSegmentAccessTotal.DeleteLabelValues(o.nodeID, label.DatabaseName, label.ResourceGroup, metrics.QueryLabel)
metrics.QueryNodeSegmentAccessDuration.DeleteLabelValues(o.nodeID, label.DatabaseName, label.ResourceGroup, metrics.SearchLabel)
metrics.QueryNodeSegmentAccessDuration.DeleteLabelValues(o.nodeID, label.DatabaseName, label.ResourceGroup, metrics.QueryLabel)
metrics.QueryNodeSegmentAccessWaitCacheTotal.DeleteLabelValues(o.nodeID, label.DatabaseName, label.ResourceGroup, metrics.SearchLabel)
metrics.QueryNodeSegmentAccessWaitCacheTotal.DeleteLabelValues(o.nodeID, label.DatabaseName, label.ResourceGroup, metrics.QueryLabel)
metrics.QueryNodeSegmentAccessWaitCacheDuration.DeleteLabelValues(o.nodeID, label.DatabaseName, label.ResourceGroup, metrics.SearchLabel)
metrics.QueryNodeSegmentAccessWaitCacheDuration.DeleteLabelValues(o.nodeID, label.DatabaseName, label.ResourceGroup, metrics.QueryLabel)
}

View File

@ -0,0 +1,64 @@
package metricsutil
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestSegmentGather(t *testing.T) {
l := SegmentLabel{
DatabaseName: "db1",
ResourceGroup: "rg1",
}
g := newSegmentObserver("1", l)
r1 := NewCacheLoadRecord(l)
g.Observe(r1)
r2 := NewCacheEvictRecord(l)
g.Observe(r2)
r3 := NewQuerySegmentAccessRecord(l)
g.Observe(r3)
r4 := NewSearchSegmentAccessRecord(l)
g.Observe(r4)
// test observe panic.
assert.Panics(t, func() {
g.Observe(&QuerySegmentAccessRecord{})
})
assert.False(t, g.IsExpired(time.Now().Add(-30*time.Second)))
assert.True(t, g.IsExpired(time.Now()))
// Clear should be ok.
g.Clear()
}
func TestSegmentsGather(t *testing.T) {
g := newSegmentsObserver()
r1 := NewQuerySegmentAccessRecord(SegmentLabel{
ResourceGroup: "rg1",
DatabaseName: "db1",
})
r1.Finish(nil)
g.Observe(r1)
assert.Equal(t, 1, g.segments.Len())
r2 := NewSearchSegmentAccessRecord(SegmentLabel{
ResourceGroup: "rg2",
DatabaseName: "db1",
})
r2.Finish(nil)
g.Observe(r2)
assert.Equal(t, 2, g.segments.Len())
g.Expire(time.Now().Add(-time.Minute))
assert.Equal(t, 2, g.segments.Len())
g.Expire(time.Now())
assert.Zero(t, g.segments.Len())
}

View File

@ -0,0 +1,185 @@
package metricsutil
import (
"time"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
var (
_ labeledRecord = QuerySegmentAccessRecord{}
_ labeledRecord = SearchSegmentAccessRecord{}
_ labeledRecord = &CacheLoadRecord{}
_ labeledRecord = &CacheEvictRecord{}
)
// SegmentLabel is the label of a segment.
type SegmentLabel struct {
DatabaseName string `expr:"DatabaseName"`
ResourceGroup string `expr:"ResourceGroup"`
}
// CacheLoadRecord records the metrics of a cache load.
type CacheLoadRecord struct {
numBytes uint64
baseRecord
}
// NewCacheLoadRecord creates a new CacheLoadRecord.
func NewCacheLoadRecord(label SegmentLabel) *CacheLoadRecord {
return &CacheLoadRecord{
baseRecord: newBaseRecord(label),
}
}
// WithBytes sets the bytes of the record.
func (r *CacheLoadRecord) WithBytes(bytes uint64) *CacheLoadRecord {
r.numBytes = bytes
return r
}
// getBytes returns the bytes of the record.
func (r *CacheLoadRecord) getBytes() float64 {
return float64(r.numBytes)
}
// Finish finishes the record.
func (r *CacheLoadRecord) Finish(err error) {
r.baseRecord.finish(err)
getGlobalObserver().Observe(r)
}
type CacheEvictRecord struct {
bytes uint64
baseRecord
}
// NewCacheEvictRecord creates a new CacheEvictRecord.
func NewCacheEvictRecord(label SegmentLabel) *CacheEvictRecord {
return &CacheEvictRecord{
baseRecord: newBaseRecord(label),
}
}
// WithBytes sets the bytes of the record.
func (r *CacheEvictRecord) WithBytes(bytes uint64) *CacheEvictRecord {
r.bytes = bytes
return r
}
// getBytes returns the bytes of the record.
func (r *CacheEvictRecord) getBytes() float64 {
return float64(r.bytes)
}
// Finish finishes the record.
func (r *CacheEvictRecord) Finish(err error) {
r.baseRecord.finish(err)
getGlobalObserver().Observe(r)
}
// NewQuerySegmentAccessRecord creates a new QuerySegmentMetricRecorder.
func NewQuerySegmentAccessRecord(label SegmentLabel) QuerySegmentAccessRecord {
return QuerySegmentAccessRecord{
segmentAccessRecord: newSegmentAccessRecord(label),
}
}
// NewSearchSegmentAccessRecord creates a new SearchSegmentMetricRecorder.
func NewSearchSegmentAccessRecord(label SegmentLabel) SearchSegmentAccessRecord {
return SearchSegmentAccessRecord{
segmentAccessRecord: newSegmentAccessRecord(label),
}
}
// QuerySegmentAccessRecord records the metrics of a query segment.
type QuerySegmentAccessRecord struct {
*segmentAccessRecord
}
func (r QuerySegmentAccessRecord) Finish(err error) {
r.finish(err)
getGlobalObserver().Observe(r)
}
// SearchSegmentAccessRecord records the metrics of a search segment.
type SearchSegmentAccessRecord struct {
*segmentAccessRecord
}
func (r SearchSegmentAccessRecord) Finish(err error) {
r.finish(err)
getGlobalObserver().Observe(r)
}
// segmentAccessRecord records the metrics of the segment.
type segmentAccessRecord struct {
isCacheMiss bool // whether the access is a cache miss.
waitLoadCost time.Duration // time cost of waiting for loading data.
baseRecord
}
// newSegmentAccessRecord creates a new accessMetricRecorder.
func newSegmentAccessRecord(label SegmentLabel) *segmentAccessRecord {
return &segmentAccessRecord{
baseRecord: newBaseRecord(label),
}
}
// CacheMissing records the cache missing.
func (r *segmentAccessRecord) CacheMissing() {
r.isCacheMiss = true
r.waitLoadCost = r.timeRecorder.RecordSpan()
}
// getWaitLoadMilliseconds returns the wait load seconds of the recorder.
func (r *segmentAccessRecord) getWaitLoadMilliseconds() float64 {
return r.waitLoadCost.Seconds() * 1000
}
// getWaitLoadDuration returns the wait load duration of the recorder.
func (r *segmentAccessRecord) getWaitLoadDuration() time.Duration {
return r.waitLoadCost
}
// newBaseRecord returns a new baseRecord.
func newBaseRecord(label SegmentLabel) baseRecord {
return baseRecord{
label: label,
timeRecorder: timerecord.NewTimeRecorder(""),
}
}
// baseRecord records the metrics of the segment.
type baseRecord struct {
label SegmentLabel
duration time.Duration
err error
timeRecorder *timerecord.TimeRecorder
}
// Label returns the label of the recorder.
func (r *baseRecord) Label() SegmentLabel {
return r.label
}
// getError returns the error of the recorder.
func (r *baseRecord) getError() error {
return r.err
}
// getDuration returns the duration of the recorder.
func (r *baseRecord) getDuration() time.Duration {
return r.duration
}
// getMilliseconds returns the duration of the recorder in seconds.
func (r *baseRecord) getMilliseconds() float64 {
return r.duration.Seconds() * 1000
}
// finish finishes the record.
func (r *baseRecord) finish(err error) {
r.err = err
r.duration = r.timeRecorder.ElapseSpan()
}

View File

@ -0,0 +1,100 @@
package metricsutil
import (
"os"
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
var testLabel = SegmentLabel{
DatabaseName: "db",
ResourceGroup: "rg",
}
func TestMain(m *testing.M) {
paramtable.Init()
os.Exit(m.Run())
}
func TestBaseRecord(t *testing.T) {
r := newBaseRecord(testLabel)
assert.Equal(t, testLabel, r.Label())
err := errors.New("test")
r.finish(err)
assert.Equal(t, err, r.getError())
assert.NotZero(t, r.getDuration())
assert.NotZero(t, r.getMilliseconds())
}
func TestSegmentAccessRecorder(t *testing.T) {
mr := newSegmentAccessRecord(SegmentLabel{
DatabaseName: "db1",
ResourceGroup: "rg1",
})
assert.Equal(t, mr.Label(), SegmentLabel{
DatabaseName: "db1",
ResourceGroup: "rg1",
})
assert.False(t, mr.isCacheMiss)
assert.Zero(t, mr.waitLoadCost)
assert.Zero(t, mr.getDuration())
mr.CacheMissing()
assert.True(t, mr.isCacheMiss)
assert.NotZero(t, mr.waitLoadCost)
assert.Zero(t, mr.getDuration())
mr.finish(nil)
assert.NotZero(t, mr.getDuration())
mr = newSegmentAccessRecord(SegmentLabel{
DatabaseName: "db1",
ResourceGroup: "rg1",
})
mr.CacheMissing()
assert.True(t, mr.isCacheMiss)
assert.NotZero(t, mr.waitLoadCost)
assert.Zero(t, mr.getDuration())
mr.finish(nil)
assert.NotZero(t, mr.getDuration())
mr = newSegmentAccessRecord(SegmentLabel{
DatabaseName: "db1",
ResourceGroup: "rg1",
})
mr.finish(nil)
assert.False(t, mr.isCacheMiss)
assert.Zero(t, mr.waitLoadCost)
assert.NotZero(t, mr.getDuration())
}
func TestSearchSegmentAccessMetric(t *testing.T) {
m := NewSearchSegmentAccessRecord(SegmentLabel{
DatabaseName: "db1",
ResourceGroup: "rg1",
})
m.CacheMissing()
m.Finish(nil)
assert.NotZero(t, m.getDuration())
}
func TestQuerySegmentAccessMetric(t *testing.T) {
m := NewQuerySegmentAccessRecord(SegmentLabel{
DatabaseName: "db1",
ResourceGroup: "rg1",
})
m.CacheMissing()
m.Finish(nil)
assert.NotZero(t, m.getDuration())
}
func TestCacheRecord(t *testing.T) {
r1 := NewCacheLoadRecord(testLabel)
r1.WithBytes(1)
assert.Equal(t, float64(1), r1.getBytes())
r1.Finish(nil)
r2 := NewCacheEvictRecord(testLabel)
r2.Finish(nil)
}

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/querynodev2/segments/metricsutil"
"github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
@ -65,10 +66,19 @@ func retrieveOnSegments(ctx context.Context, mgr *Manager, segments []Segment, s
wg.Add(1)
go func(seg Segment, i int) {
defer wg.Done()
// record search time and cache miss
var err error
accessRecord := metricsutil.NewQuerySegmentAccessRecord(getSegmentMetricLabel(seg))
defer func() {
accessRecord.Finish(err)
}()
if seg.IsLazyLoad() {
err = mgr.DiskCache.Do(seg.ID(), retriever)
var missing bool
missing, err = mgr.DiskCache.Do(seg.ID(), retriever)
if missing {
accessRecord.CacheMissing()
}
} else {
err = retriever(seg)
}

View File

@ -24,6 +24,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/querynodev2/segments/metricsutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -77,8 +78,16 @@ func searchSegments(ctx context.Context, mgr *Manager, segments []Segment, segTy
mu.Unlock()
}
var err error
accessRecord := metricsutil.NewSearchSegmentAccessRecord(getSegmentMetricLabel(seg))
defer func() {
accessRecord.Finish(err)
}()
if seg.IsLazyLoad() {
err = mgr.DiskCache.Do(seg.ID(), searcher)
var missing bool
missing, err = mgr.DiskCache.Do(seg.ID(), searcher)
if missing {
accessRecord.CacheMissing()
}
} else {
err = searcher(seg)
}

View File

@ -24,6 +24,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/querynodev2/segments/metricsutil"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
@ -346,3 +347,11 @@ func getIndexEngineVersion() (minimal, current int32) {
cMinimal, cCurrent := C.GetMinimalIndexVersion(), C.GetCurrentIndexVersion()
return int32(cMinimal), int32(cCurrent)
}
// getSegmentMetricLabel returns the label for segment metrics.
func getSegmentMetricLabel(segment Segment) metricsutil.SegmentLabel {
return metricsutil.SegmentLabel{
DatabaseName: segment.DatabaseName(),
ResourceGroup: segment.ResourceGroup(),
}
}

View File

@ -67,19 +67,23 @@ func (t *QueryTask) PreExecute() error {
// Update task wait time metric before execute
nodeID := strconv.FormatInt(paramtable.GetNodeID(), 10)
inQueueDuration := t.tr.ElapseSpan()
inQueueDurationMS := inQueueDuration.Seconds() * 1000
// Update in queue metric for prometheus.
metrics.QueryNodeSQLatencyInQueue.WithLabelValues(
nodeID,
metrics.QueryLabel).
Observe(float64(inQueueDuration.Milliseconds()))
metrics.QueryLabel,
t.collection.GetDBName(),
t.collection.GetResourceGroup(), // TODO: resource group and db name may be removed at runtime.
// should be refactor into metricsutil.observer in the future.
).Observe(inQueueDurationMS)
username := t.Username()
metrics.QueryNodeSQPerUserLatencyInQueue.WithLabelValues(
nodeID,
metrics.QueryLabel,
username).
Observe(float64(inQueueDuration.Milliseconds()))
Observe(inQueueDurationMS)
// Update collector for query node quota.
collector.Average.Add(metricsinfo.QueryQueueMetric, float64(inQueueDuration.Microseconds()))

View File

@ -98,19 +98,24 @@ func (t *SearchTask) PreExecute() error {
// Update task wait time metric before execute
nodeID := strconv.FormatInt(t.GetNodeID(), 10)
inQueueDuration := t.tr.ElapseSpan()
inQueueDurationMS := inQueueDuration.Seconds() * 1000
// Update in queue metric for prometheus.
metrics.QueryNodeSQLatencyInQueue.WithLabelValues(
nodeID,
metrics.SearchLabel).
Observe(float64(inQueueDuration.Milliseconds()))
metrics.SearchLabel,
t.collection.GetDBName(),
t.collection.GetResourceGroup(),
// TODO: resource group and db name may be removed at runtime,
// should be refactor into metricsutil.observer in the future.
).Observe(inQueueDurationMS)
username := t.Username()
metrics.QueryNodeSQPerUserLatencyInQueue.WithLabelValues(
nodeID,
metrics.SearchLabel,
username).
Observe(float64(inQueueDuration.Milliseconds()))
Observe(inQueueDurationMS)
// Update collector for query node quota.
collector.Average.Add(metricsinfo.SearchQueueMetric, float64(inQueueDuration.Microseconds()))

View File

@ -175,6 +175,8 @@ var (
}, []string{
nodeIDLabelName,
queryTypeLabelName,
databaseLabelName,
resourceGroupLabelName,
})
QueryNodeSQPerUserLatencyInQueue = prometheus.NewHistogramVec(
@ -502,6 +504,194 @@ var (
}, []string{
nodeIDLabelName,
})
// QueryNodeSegmentAccessTotal records the total number of search or query segments accessed.
QueryNodeSegmentAccessTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "segment_access_total",
Help: "number of segments accessed",
}, []string{
nodeIDLabelName,
databaseLabelName,
resourceGroupLabelName,
queryTypeLabelName,
},
)
// QueryNodeSegmentAccessDuration records the total time cost of accessing segments including cache loads.
QueryNodeSegmentAccessDuration = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "segment_access_duration",
Help: "total time cost of accessing segments",
}, []string{
nodeIDLabelName,
databaseLabelName,
resourceGroupLabelName,
queryTypeLabelName,
},
)
// QueryNodeSegmentAccessGlobalDuration records the global time cost of accessing segments.
QueryNodeSegmentAccessGlobalDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "segment_access_global_duration",
Help: "global time cost of accessing segments",
Buckets: longTaskBuckets,
}, []string{
nodeIDLabelName,
queryTypeLabelName,
},
)
// QueryNodeSegmentAccessWaitCacheTotal records the number of search or query segments that have to wait for loading access.
QueryNodeSegmentAccessWaitCacheTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "segment_access_wait_cache_total",
Help: "number of segments waiting for loading access",
}, []string{
nodeIDLabelName,
databaseLabelName,
resourceGroupLabelName,
queryTypeLabelName,
})
// QueryNodeSegmentAccessWaitCacheDuration records the total time cost of waiting for loading access.
QueryNodeSegmentAccessWaitCacheDuration = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "segment_access_wait_cache_duration",
Help: "total time cost of waiting for loading access",
}, []string{
nodeIDLabelName,
databaseLabelName,
resourceGroupLabelName,
queryTypeLabelName,
})
// QueryNodeSegmentAccessWaitCacheGlobalDuration records the global time cost of waiting for loading access.
QueryNodeSegmentAccessWaitCacheGlobalDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "segment_access_wait_cache_global_duration",
Help: "global time cost of waiting for loading access",
Buckets: longTaskBuckets,
}, []string{
nodeIDLabelName,
queryTypeLabelName,
})
// QueryNodeDiskCacheLoadTotal records the number of real segments loaded from disk cache.
QueryNodeDiskCacheLoadTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Help: "number of segments loaded from disk cache",
Name: "disk_cache_load_total",
}, []string{
nodeIDLabelName,
databaseLabelName,
resourceGroupLabelName,
})
// QueryNodeDiskCacheLoadBytes records the number of bytes loaded from disk cache.
QueryNodeDiskCacheLoadBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Help: "number of bytes loaded from disk cache",
Name: "disk_cache_load_bytes",
}, []string{
nodeIDLabelName,
databaseLabelName,
resourceGroupLabelName,
})
// QueryNodeDiskCacheLoadDuration records the total time cost of loading segments from disk cache.
// With db and resource group labels.
QueryNodeDiskCacheLoadDuration = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Help: "total time cost of loading segments from disk cache",
Name: "disk_cache_load_duration",
}, []string{
nodeIDLabelName,
databaseLabelName,
resourceGroupLabelName,
})
// QueryNodeDiskCacheLoadGlobalDuration records the global time cost of loading segments from disk cache.
QueryNodeDiskCacheLoadGlobalDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "disk_cache_load_global_duration",
Help: "global duration of loading segments from disk cache",
Buckets: longTaskBuckets,
}, []string{
nodeIDLabelName,
})
// QueryNodeDiskCacheEvictTotal records the number of real segments evicted from disk cache.
QueryNodeDiskCacheEvictTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "disk_cache_evict_total",
Help: "number of segments evicted from disk cache",
}, []string{
nodeIDLabelName,
databaseLabelName,
resourceGroupLabelName,
})
// QueryNodeDiskCacheEvictBytes records the number of bytes evicted from disk cache.
QueryNodeDiskCacheEvictBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "disk_cache_evict_bytes",
Help: "number of bytes evicted from disk cache",
}, []string{
nodeIDLabelName,
databaseLabelName,
resourceGroupLabelName,
})
// QueryNodeDiskCacheEvictDuration records the total time cost of evicting segments from disk cache.
QueryNodeDiskCacheEvictDuration = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "disk_cache_evict_duration",
Help: "total time cost of evicting segments from disk cache",
}, []string{
nodeIDLabelName,
databaseLabelName,
resourceGroupLabelName,
})
// QueryNodeDiskCacheEvictGlobalDuration records the global time cost of evicting segments from disk cache.
QueryNodeDiskCacheEvictGlobalDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "disk_cache_evict_global_duration",
Help: "global duration of evicting segments from disk cache",
Buckets: longTaskBuckets,
}, []string{
nodeIDLabelName,
})
)
// RegisterQueryNode registers QueryNode metrics
@ -549,6 +739,20 @@ func RegisterQueryNode(registry *prometheus.Registry) {
registry.MustRegister(StoppingBalanceSegmentNum)
registry.MustRegister(QueryNodeLoadSegmentConcurrency)
registry.MustRegister(QueryNodeLoadIndexLatency)
registry.MustRegister(QueryNodeSegmentAccessTotal)
registry.MustRegister(QueryNodeSegmentAccessDuration)
registry.MustRegister(QueryNodeSegmentAccessGlobalDuration)
registry.MustRegister(QueryNodeSegmentAccessWaitCacheTotal)
registry.MustRegister(QueryNodeSegmentAccessWaitCacheDuration)
registry.MustRegister(QueryNodeSegmentAccessWaitCacheGlobalDuration)
registry.MustRegister(QueryNodeDiskCacheLoadTotal)
registry.MustRegister(QueryNodeDiskCacheLoadBytes)
registry.MustRegister(QueryNodeDiskCacheLoadDuration)
registry.MustRegister(QueryNodeDiskCacheLoadGlobalDuration)
registry.MustRegister(QueryNodeDiskCacheEvictTotal)
registry.MustRegister(QueryNodeDiskCacheEvictBytes)
registry.MustRegister(QueryNodeDiskCacheEvictDuration)
registry.MustRegister(QueryNodeDiskCacheEvictGlobalDuration)
}
func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) {

View File

@ -78,12 +78,12 @@ type Cache[K comparable, V any] interface {
// completes.
// Throws `ErrNoSuchItem` if the key is not found or not able to be loaded from given loader.
// Throws `ErrNotEnoughSpace` if there is no room for the operation.
Do(key K, doer func(V) error) error
Do(key K, doer func(V) error) (missing bool, err error)
// Do the operation `doer` on the given key `key`. The key is kept in the cache until the operation
// completes. The function waits for `timeout` if there is not enough space for the given key.
// Throws `ErrNoSuchItem` if the key is not found or not able to be loaded from given loader.
// Throws `ErrTimeOut` if timed out.
DoWait(key K, timeout time.Duration, doer func(V) error) error
DoWait(key K, timeout time.Duration, doer func(V) error) (missing bool, err error)
}
type Waiter[K comparable] struct {
@ -178,16 +178,16 @@ func newLRUCache[K comparable, V any](
}
// Do picks up an item from cache and executes doer. The entry of interest is garented in the cache when doer is executing.
func (c *lruCache[K, V]) Do(key K, doer func(V) error) error {
item, err := c.getAndPin(key)
func (c *lruCache[K, V]) Do(key K, doer func(V) error) (bool, error) {
item, missing, err := c.getAndPin(key)
if err != nil {
return err
return missing, err
}
defer c.Unpin(key)
return doer(item.value)
return missing, doer(item.value)
}
func (c *lruCache[K, V]) DoWait(key K, timeout time.Duration, doer func(V) error) error {
func (c *lruCache[K, V]) DoWait(key K, timeout time.Duration, doer func(V) error) (bool, error) {
timedWait := func(cond *sync.Cond, timeout time.Duration) bool {
c := make(chan struct{})
go func() {
@ -207,7 +207,7 @@ func (c *lruCache[K, V]) DoWait(key K, timeout time.Duration, doer func(V) error
var ele *list.Element
start := time.Now()
for {
item, err := c.getAndPin(key)
item, missing, err := c.getAndPin(key)
if err == nil {
if ele != nil {
c.rwlock.Lock()
@ -215,9 +215,9 @@ func (c *lruCache[K, V]) DoWait(key K, timeout time.Duration, doer func(V) error
c.rwlock.Unlock()
}
defer c.Unpin(key)
return doer(item.value)
return missing, doer(item.value)
} else if err != ErrNotEnoughSpace {
return err
return true, err
}
if ele == nil {
// If no enough space, enqueue the key
@ -229,7 +229,7 @@ func (c *lruCache[K, V]) DoWait(key K, timeout time.Duration, doer func(V) error
// Wait for the key to be available
timeLeft := time.Until(start.Add(timeout))
if timeLeft <= 0 || timedWait(ele.Value.(*Waiter[K]).c, timeLeft) {
return ErrTimeOut
return true, ErrTimeOut
}
}
}
@ -269,16 +269,16 @@ func (c *lruCache[K, V]) peekAndPin(key K) *cacheItem[K, V] {
}
// GetAndPin gets and pins the given key if it exists
func (c *lruCache[K, V]) getAndPin(key K) (*cacheItem[K, V], error) {
func (c *lruCache[K, V]) getAndPin(key K) (*cacheItem[K, V], bool, error) {
if item := c.peekAndPin(key); item != nil {
return item, nil
return item, false, nil
}
if c.loader != nil {
// Try scavenge if there is room. If not, fail fast.
// Note that the test is not accurate since we are not locking `loader` here.
if _, ok := c.tryScavenge(key); !ok {
return nil, ErrNotEnoughSpace
return nil, true, ErrNotEnoughSpace
}
strKey := fmt.Sprint(key)
@ -300,12 +300,12 @@ func (c *lruCache[K, V]) getAndPin(key K) (*cacheItem[K, V], error) {
})
if err == nil {
return item.(*cacheItem[K, V]), nil
return item.(*cacheItem[K, V]), true, nil
}
return nil, err
return nil, true, err
}
return nil, ErrNoSuchItem
return nil, true, ErrNoSuchItem
}
func (c *lruCache[K, V]) tryScavenge(key K) ([]K, bool) {

View File

@ -21,10 +21,11 @@ func TestLRUCache(t *testing.T) {
cache := cacheBuilder.WithCapacity(int64(size)).Build()
for i := 0; i < size; i++ {
err := cache.Do(i, func(v int) error {
missing, err := cache.Do(i, func(v int) error {
assert.Equal(t, i, v)
return nil
})
assert.True(t, missing)
assert.NoError(t, err)
}
})
@ -38,20 +39,22 @@ func TestLRUCache(t *testing.T) {
}).Build()
for i := 0; i < size*2; i++ {
err := cache.Do(i, func(v int) error {
missing, err := cache.Do(i, func(v int) error {
assert.Equal(t, i, v)
return nil
})
assert.True(t, missing)
assert.NoError(t, err)
}
assert.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, finalizeSeq)
// Hit the cache again, there should be no swap-out
for i := size; i < size*2; i++ {
err := cache.Do(i, func(v int) error {
missing, err := cache.Do(i, func(v int) error {
assert.Equal(t, i, v)
return nil
})
assert.False(t, missing)
assert.NoError(t, err)
}
assert.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, finalizeSeq)
@ -68,10 +71,11 @@ func TestLRUCache(t *testing.T) {
}).Build()
for i := 0; i < 20; i++ {
err := cache.Do(i, func(v int) error {
missing, err := cache.Do(i, func(v int) error {
assert.Equal(t, i, v)
return nil
})
assert.True(t, missing)
assert.NoError(t, err)
}
assert.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18}, finalizeSeq)
@ -80,9 +84,10 @@ func TestLRUCache(t *testing.T) {
t.Run("test do negative", func(t *testing.T) {
cache := cacheBuilder.Build()
theErr := errors.New("error")
err := cache.Do(-1, func(v int) error {
missing, err := cache.Do(-1, func(v int) error {
return theErr
})
assert.True(t, missing)
assert.Equal(t, theErr, err)
})
@ -97,16 +102,18 @@ func TestLRUCache(t *testing.T) {
}).Build()
for i := 0; i < 20; i++ {
err := cache.Do(i, func(v int) error {
missing, err := cache.Do(i, func(v int) error {
assert.Equal(t, i, v)
return nil
})
assert.True(t, missing)
assert.NoError(t, err)
}
assert.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18}, finalizeSeq)
err := cache.Do(100, func(v int) error {
missing, err := cache.Do(100, func(v int) error {
return nil
})
assert.True(t, missing)
assert.Equal(t, ErrNotEnoughSpace, err)
})
@ -117,13 +124,15 @@ func TestLRUCache(t *testing.T) {
}
return key, true
}).Build()
err := cache.Do(0, func(v int) error {
missing, err := cache.Do(0, func(v int) error {
return nil
})
assert.True(t, missing)
assert.NoError(t, err)
err = cache.Do(-1, func(v int) error {
missing, err = cache.Do(-1, func(v int) error {
return nil
})
assert.True(t, missing)
assert.Equal(t, ErrNoSuchItem, err)
})
}
@ -144,7 +153,7 @@ func TestLRUCacheConcurrency(t *testing.T) {
go func(i int) {
defer wg.Done()
for j := 0; j < 100; j++ {
err := cache.Do(j, func(v int) error {
_, err := cache.Do(j, func(v int) error {
return nil
})
assert.NoError(t, err)
@ -171,7 +180,7 @@ func TestLRUCacheConcurrency(t *testing.T) {
return nil
})
wg1.Wait()
err := cache.Do(1001, func(v int) error {
_, err := cache.Do(1001, func(v int) error {
return nil
})
wg.Done()
@ -195,10 +204,11 @@ func TestLRUCacheConcurrency(t *testing.T) {
return nil
})
wg1.Wait()
err := cache.DoWait(1001, time.Nanosecond, func(v int) error {
missing, err := cache.DoWait(1001, time.Nanosecond, func(v int) error {
return nil
})
wg.Done()
assert.True(t, missing)
assert.Equal(t, ErrTimeOut, err)
})
@ -218,9 +228,10 @@ func TestLRUCacheConcurrency(t *testing.T) {
return nil
})
wg1.Wait()
err := cache.DoWait(1001, time.Second*2, func(v int) error {
missing, err := cache.DoWait(1001, time.Second*2, func(v int) error {
return nil
})
assert.True(t, missing)
assert.NoError(t, err)
})
@ -239,7 +250,7 @@ func TestLRUCacheConcurrency(t *testing.T) {
go func(i int) {
defer wg.Done()
for j := 0; j < 20; j++ {
err := cache.DoWait(j, time.Second, func(v int) error {
_, err := cache.DoWait(j, time.Second, func(v int) error {
time.Sleep(time.Duration(rand.Intn(3)))
return nil
})