mirror of https://github.com/milvus-io/milvus.git
108 lines
3.2 KiB
Go
108 lines
3.2 KiB
Go
package metricsutil
|
|
|
|
import (
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
|
|
"github.com/milvus-io/milvus/pkg/metrics"
|
|
"github.com/milvus-io/milvus/pkg/streaming/util/message"
|
|
"github.com/milvus-io/milvus/pkg/streaming/util/types"
|
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
|
)
|
|
|
|
// NewWriteMetrics creates a new WriteMetrics.
|
|
func NewWriteMetrics(pchannel types.PChannelInfo, walName string) *WriteMetrics {
|
|
constLabel := prometheus.Labels{
|
|
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
|
|
metrics.WALChannelLabelName: pchannel.Name,
|
|
}
|
|
metrics.WALInfo.WithLabelValues(
|
|
paramtable.GetStringNodeID(),
|
|
pchannel.Name,
|
|
strconv.FormatInt(pchannel.Term, 10),
|
|
walName).Set(1)
|
|
|
|
return &WriteMetrics{
|
|
walName: walName,
|
|
pchannel: pchannel,
|
|
constLabel: constLabel,
|
|
bytes: metrics.WALAppendMessageBytes.MustCurryWith(constLabel),
|
|
total: metrics.WALAppendMessageTotal.MustCurryWith(constLabel),
|
|
walDuration: metrics.WALAppendMessageDurationSeconds.MustCurryWith(constLabel),
|
|
walimplsDuration: metrics.WALImplsAppendMessageDurationSeconds.MustCurryWith(constLabel),
|
|
}
|
|
}
|
|
|
|
type WriteMetrics struct {
|
|
walName string
|
|
pchannel types.PChannelInfo
|
|
constLabel prometheus.Labels
|
|
bytes prometheus.ObserverVec
|
|
total *prometheus.CounterVec
|
|
walDuration prometheus.ObserverVec
|
|
walimplsDuration prometheus.ObserverVec
|
|
}
|
|
|
|
func (m *WriteMetrics) StartAppend(msgType message.MessageType, bytes int) *WriteGuard {
|
|
return &WriteGuard{
|
|
startAppend: time.Now(),
|
|
metrics: m,
|
|
msgType: msgType,
|
|
bytes: bytes,
|
|
}
|
|
}
|
|
|
|
func (m *WriteMetrics) Close() {
|
|
metrics.WALAppendMessageBytes.DeletePartialMatch(m.constLabel)
|
|
metrics.WALAppendMessageTotal.DeletePartialMatch(m.constLabel)
|
|
metrics.WALAppendMessageDurationSeconds.DeletePartialMatch(m.constLabel)
|
|
metrics.WALImplsAppendMessageDurationSeconds.DeletePartialMatch(m.constLabel)
|
|
metrics.WALInfo.DeleteLabelValues(
|
|
paramtable.GetStringNodeID(),
|
|
m.pchannel.Name,
|
|
strconv.FormatInt(m.pchannel.Term, 10),
|
|
m.walName,
|
|
)
|
|
}
|
|
|
|
type WriteGuard struct {
|
|
startAppend time.Time
|
|
startImplAppend time.Time
|
|
implCost time.Duration
|
|
metrics *WriteMetrics
|
|
msgType message.MessageType
|
|
bytes int
|
|
}
|
|
|
|
func (g *WriteGuard) StartWALImplAppend() {
|
|
g.startImplAppend = time.Now()
|
|
}
|
|
|
|
func (g *WriteGuard) FinishWALImplAppend() {
|
|
g.implCost = time.Since(g.startImplAppend)
|
|
}
|
|
|
|
func (g *WriteGuard) Finish(err error) {
|
|
status := parseError(err)
|
|
if g.implCost != 0 {
|
|
g.metrics.walimplsDuration.WithLabelValues(status).Observe(g.implCost.Seconds())
|
|
}
|
|
g.metrics.bytes.WithLabelValues(status).Observe(float64(g.bytes))
|
|
g.metrics.total.WithLabelValues(g.msgType.String(), status).Inc()
|
|
g.metrics.walDuration.WithLabelValues(status).Observe(time.Since(g.startAppend).Seconds())
|
|
}
|
|
|
|
// parseError parses the error to status.
|
|
func parseError(err error) string {
|
|
if err == nil {
|
|
return metrics.StreamingServiceClientStatusOK
|
|
}
|
|
if status.IsCanceled(err) {
|
|
return metrics.StreamingServiceClientStatusCancel
|
|
}
|
|
return metrics.StreamignServiceClientStatusError
|
|
}
|