fix: add back existing datanode metrics (#29360)

See also: #29204

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/29422/head
XuanYang-cn 2023-12-22 14:20:43 +08:00 committed by GitHub
parent 67ab0e424b
commit 7a6aa8552a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 75 additions and 28 deletions

View File

@ -69,7 +69,7 @@ func newDmInputNode(initCtx context.Context, dispatcherClient msgdispatcher.Clie
Params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32(),
Params.DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32(),
typeutil.DataNodeRole,
paramtable.GetNodeID(),
dmNodeConfig.serverID,
dmNodeConfig.collectionID,
metrics.AllLabel,
)

View File

@ -37,6 +37,7 @@ func (d *keyLockDispatcher[K]) Submit(key K, t Task, callbacks ...func(error)) *
for _, callback := range callbacks {
callback(err)
}
return err, nil
})
}

View File

@ -2,6 +2,7 @@ package syncmgr
import (
"context"
"fmt"
"path"
"strconv"
@ -18,9 +19,12 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -62,6 +66,8 @@ type SyncTask struct {
writeRetryOpts []retry.Option
failureCallback func(err error)
tr *timerecord.TimeRecorder
}
func (t *SyncTask) getLogger() *log.MLogger {
@ -73,13 +79,21 @@ func (t *SyncTask) getLogger() *log.MLogger {
)
}
func (t *SyncTask) handleError(err error) {
func (t *SyncTask) handleError(err error, metricSegLevel string) {
if t.failureCallback != nil {
t.failureCallback(err)
}
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel, metricSegLevel).Inc()
if !t.isFlush {
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel, metricSegLevel).Inc()
}
}
func (t *SyncTask) Run() error {
t.tr = timerecord.NewTimeRecorder("syncTask")
var metricSegLevel string = t.level.String()
log := t.getLogger()
var err error
var has bool
@ -88,7 +102,7 @@ func (t *SyncTask) Run() error {
if !has {
log.Warn("failed to sync data, segment not found in metacache")
err := merr.WrapErrSegmentNotFound(t.segmentID)
t.handleError(err)
t.handleError(err, metricSegLevel)
return err
}
@ -107,29 +121,42 @@ func (t *SyncTask) Run() error {
err = t.serializeInsertData()
if err != nil {
log.Warn("failed to serialize insert data", zap.Error(err))
t.handleError(err)
t.handleError(err, metricSegLevel)
return err
}
err = t.serializeDeleteData()
if err != nil {
log.Warn("failed to serialize delete data", zap.Error(err))
t.handleError(err)
t.handleError(err, metricSegLevel)
return err
}
var totalSize float64 = 0
if t.deleteData != nil {
totalSize += float64(t.deleteData.Size())
}
if t.insertData != nil {
totalSize += float64(t.insertData.GetMemorySize())
}
metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.AllLabel, metricSegLevel).Add(totalSize)
metrics.DataNodeEncodeBufferLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metricSegLevel).Observe(float64(t.tr.RecordSpan().Milliseconds()))
err = t.writeLogs()
if err != nil {
log.Warn("failed to save serialized data into storage", zap.Error(err))
t.handleError(err)
t.handleError(err, metricSegLevel)
return err
}
metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metricSegLevel).Observe(float64(t.tr.RecordSpan().Milliseconds()))
if t.metaWriter != nil {
err = t.writeMeta()
if err != nil {
log.Warn("failed to save serialized data into storage", zap.Error(err))
t.handleError(err)
t.handleError(err, metricSegLevel)
return err
}
}
@ -145,6 +172,11 @@ func (t *SyncTask) Run() error {
t.metacache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(t.segment.SegmentID()))
log.Info("task done")
if !t.isFlush {
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel, metricSegLevel).Inc()
}
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel, metricSegLevel).Inc()
return nil
}

View File

@ -24,9 +24,11 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -324,6 +326,11 @@ func (wb *writeBufferBase) bufferInsert(insertMsgs []*msgstream.InsertMsg, start
segmentPKData[segmentID] = pkData
wb.metaCache.UpdateSegments(metacache.UpdateBufferedRows(segBuf.insertBuffer.rows),
metacache.WithSegmentIDs(segmentID))
totalSize := lo.SumBy(pkData, func(iData storage.FieldData) float64 {
return float64(iData.GetMemorySize())
})
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Add(totalSize)
}
return segmentPKData, nil
@ -332,7 +339,8 @@ func (wb *writeBufferBase) bufferInsert(insertMsgs []*msgstream.InsertMsg, start
// bufferDelete buffers DeleteMsg into DeleteData.
func (wb *writeBufferBase) bufferDelete(segmentID int64, pks []storage.PrimaryKey, tss []typeutil.Timestamp, startPos, endPos *msgpb.MsgPosition) error {
segBuf := wb.getOrCreateBuffer(segmentID)
segBuf.deltaBuffer.Buffer(pks, tss, startPos, endPos)
bufSize := segBuf.deltaBuffer.Buffer(pks, tss, startPos, endPos)
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Add(float64(bufSize))
return nil
}
@ -381,6 +389,7 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) syn
return nil
}
var batchSize int64
var totalMemSize float64
var tsFrom, tsTo uint64
insert, delta, timeRange, startPos := wb.yieldBuffer(segmentID)
@ -391,6 +400,10 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) syn
actions := []metacache.SegmentAction{metacache.RollStats()}
if insert != nil {
batchSize = int64(insert.GetRowNum())
totalMemSize += float64(insert.GetMemorySize())
}
if delta != nil {
totalMemSize += float64(delta.Size())
}
actions = append(actions, metacache.StartSyncing(batchSize))
wb.metaCache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(segmentID))
@ -455,6 +468,8 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) syn
syncTask = task
}
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Sub(totalMemSize)
return syncTask
}

View File

@ -115,16 +115,6 @@ func (inNode *InputNode) Operate(in []Msg) []Msg {
inNode.lastMsg = msgPack
sub := tsoutil.SubByNow(msgPack.EndTs)
if inNode.role == typeutil.QueryNodeRole {
metrics.QueryNodeConsumerMsgCount.
WithLabelValues(fmt.Sprint(inNode.nodeID), inNode.dataType, fmt.Sprint(inNode.collectionID)).
Inc()
metrics.QueryNodeConsumeTimeTickLag.
WithLabelValues(fmt.Sprint(inNode.nodeID), inNode.dataType, fmt.Sprint(inNode.collectionID)).
Set(float64(sub))
}
if inNode.role == typeutil.DataNodeRole {
metrics.DataNodeConsumeMsgCount.
WithLabelValues(fmt.Sprint(inNode.nodeID), inNode.dataType, fmt.Sprint(inNode.collectionID)).

View File

@ -55,6 +55,7 @@ var (
}, []string{
nodeIDLabelName,
msgTypeLabelName,
segmentLevelLabelName,
})
DataNodeNumProducers = prometheus.NewGaugeVec(
@ -112,6 +113,7 @@ var (
Buckets: buckets,
}, []string{
nodeIDLabelName,
segmentLevelLabelName,
})
DataNodeSave2StorageLatency = prometheus.NewHistogramVec(
@ -135,6 +137,7 @@ var (
}, []string{
nodeIDLabelName,
statusLabelName,
segmentLevelLabelName,
})
DataNodeAutoFlushBufferCount = prometheus.NewCounterVec( // TODO: arguably
@ -146,6 +149,7 @@ var (
}, []string{
nodeIDLabelName,
statusLabelName,
segmentLevelLabelName,
})
DataNodeCompactionLatency = prometheus.NewHistogramVec(
@ -226,23 +230,28 @@ var (
// RegisterDataNode registers DataNode metrics
func RegisterDataNode(registry *prometheus.Registry) {
registry.MustRegister(DataNodeNumFlowGraphs)
// input related
registry.MustRegister(DataNodeConsumeMsgRowsCount)
registry.MustRegister(DataNodeFlushedSize)
registry.MustRegister(DataNodeNumProducers)
registry.MustRegister(DataNodeConsumeTimeTickLag)
registry.MustRegister(DataNodeMsgDispatcherTtLag)
registry.MustRegister(DataNodeConsumeMsgCount)
registry.MustRegister(DataNodeConsumeBytesCount)
// in memory
registry.MustRegister(DataNodeFlowGraphBufferDataSize)
// output related
registry.MustRegister(DataNodeAutoFlushBufferCount)
registry.MustRegister(DataNodeEncodeBufferLatency)
registry.MustRegister(DataNodeSave2StorageLatency)
registry.MustRegister(DataNodeFlushBufferCount)
registry.MustRegister(DataNodeAutoFlushBufferCount)
registry.MustRegister(DataNodeCompactionLatency)
registry.MustRegister(DataNodeFlushReqCounter)
registry.MustRegister(DataNodeConsumeMsgCount)
registry.MustRegister(DataNodeProduceTimeTickLag)
registry.MustRegister(DataNodeConsumeBytesCount)
registry.MustRegister(DataNodeForwardDeleteMsgTimeTaken)
registry.MustRegister(DataNodeMsgDispatcherTtLag)
registry.MustRegister(DataNodeFlushedSize)
// compaction related
registry.MustRegister(DataNodeCompactionLatency)
registry.MustRegister(DataNodeCompactionLatencyInQueue)
registry.MustRegister(DataNodeFlowGraphBufferDataSize)
// deprecated metrics
registry.MustRegister(DataNodeForwardDeleteMsgTimeTaken)
registry.MustRegister(DataNodeNumProducers)
registry.MustRegister(DataNodeProduceTimeTickLag)
}
func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel string) {