milvus/internal/streamingnode/server/wal/metricsutil/wal_write.go

108 lines
3.2 KiB
Go

package metricsutil
import (
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// NewWriteMetrics creates a new WriteMetrics.
func NewWriteMetrics(pchannel types.PChannelInfo, walName string) *WriteMetrics {
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel.Name,
}
metrics.WALInfo.WithLabelValues(
paramtable.GetStringNodeID(),
pchannel.Name,
strconv.FormatInt(pchannel.Term, 10),
walName).Set(1)
return &WriteMetrics{
walName: walName,
pchannel: pchannel,
constLabel: constLabel,
bytes: metrics.WALAppendMessageBytes.MustCurryWith(constLabel),
total: metrics.WALAppendMessageTotal.MustCurryWith(constLabel),
walDuration: metrics.WALAppendMessageDurationSeconds.MustCurryWith(constLabel),
walimplsDuration: metrics.WALImplsAppendMessageDurationSeconds.MustCurryWith(constLabel),
}
}
type WriteMetrics struct {
walName string
pchannel types.PChannelInfo
constLabel prometheus.Labels
bytes prometheus.ObserverVec
total *prometheus.CounterVec
walDuration prometheus.ObserverVec
walimplsDuration prometheus.ObserverVec
}
func (m *WriteMetrics) StartAppend(msgType message.MessageType, bytes int) *WriteGuard {
return &WriteGuard{
startAppend: time.Now(),
metrics: m,
msgType: msgType,
bytes: bytes,
}
}
func (m *WriteMetrics) Close() {
metrics.WALAppendMessageBytes.DeletePartialMatch(m.constLabel)
metrics.WALAppendMessageTotal.DeletePartialMatch(m.constLabel)
metrics.WALAppendMessageDurationSeconds.DeletePartialMatch(m.constLabel)
metrics.WALImplsAppendMessageDurationSeconds.DeletePartialMatch(m.constLabel)
metrics.WALInfo.DeleteLabelValues(
paramtable.GetStringNodeID(),
m.pchannel.Name,
strconv.FormatInt(m.pchannel.Term, 10),
m.walName,
)
}
type WriteGuard struct {
startAppend time.Time
startImplAppend time.Time
implCost time.Duration
metrics *WriteMetrics
msgType message.MessageType
bytes int
}
func (g *WriteGuard) StartWALImplAppend() {
g.startImplAppend = time.Now()
}
func (g *WriteGuard) FinishWALImplAppend() {
g.implCost = time.Since(g.startImplAppend)
}
func (g *WriteGuard) Finish(err error) {
status := parseError(err)
if g.implCost != 0 {
g.metrics.walimplsDuration.WithLabelValues(status).Observe(g.implCost.Seconds())
}
g.metrics.bytes.WithLabelValues(status).Observe(float64(g.bytes))
g.metrics.total.WithLabelValues(g.msgType.String(), status).Inc()
g.metrics.walDuration.WithLabelValues(status).Observe(time.Since(g.startAppend).Seconds())
}
// parseError parses the error to status.
func parseError(err error) string {
if err == nil {
return metrics.StreamingServiceClientStatusOK
}
if status.IsCanceled(err) {
return metrics.StreamingServiceClientStatusCancel
}
return metrics.StreamignServiceClientStatusError
}