milvus/internal/streamingnode/server/wal/metricsutil/wal_scan.go

134 lines
4.5 KiB
Go

package metricsutil
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
func NewScanMetrics(pchannel types.PChannelInfo) *ScanMetrics {
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel.Name,
}
return &ScanMetrics{
constLabel: constLabel,
messageBytes: metrics.WALScanMessageBytes.With(constLabel),
passMessageBytes: metrics.WALScanPassMessageBytes.With(constLabel),
messageTotal: metrics.WALScanMessageTotal.MustCurryWith(constLabel),
passMessageTotal: metrics.WALScanPassMessageTotal.MustCurryWith(constLabel),
timeTickViolationTotal: metrics.WALScanTimeTickViolationMessageTotal.MustCurryWith(constLabel),
txnTotal: metrics.WALScanTxnTotal.MustCurryWith(constLabel),
pendingQueueSize: metrics.WALScannerPendingQueueBytes.With(constLabel),
timeTickBufSize: metrics.WALScannerTimeTickBufBytes.With(constLabel),
txnBufSize: metrics.WALScannerTxnBufBytes.With(constLabel),
}
}
type ScanMetrics struct {
constLabel prometheus.Labels
messageBytes prometheus.Observer
passMessageBytes prometheus.Observer
messageTotal *prometheus.CounterVec
passMessageTotal *prometheus.CounterVec
timeTickViolationTotal *prometheus.CounterVec
txnTotal *prometheus.CounterVec
timeTickBufSize prometheus.Gauge
txnBufSize prometheus.Gauge
pendingQueueSize prometheus.Gauge
}
// ObserveMessage observes the message.
func (m *ScanMetrics) ObserveMessage(msgType message.MessageType, bytes int) {
m.messageBytes.Observe(float64(bytes))
m.messageTotal.WithLabelValues(msgType.String()).Inc()
}
// ObserveFilteredMessage observes the filtered message.
func (m *ScanMetrics) ObserveFilteredMessage(msgType message.MessageType, bytes int) {
m.passMessageBytes.Observe(float64(bytes))
m.passMessageTotal.WithLabelValues(msgType.String()).Inc()
}
// ObserveTimeTickViolation observes the time tick violation.
func (m *ScanMetrics) ObserveTimeTickViolation(msgType message.MessageType) {
m.timeTickViolationTotal.WithLabelValues(msgType.String()).Inc()
}
// ObserveAutoCommitTxn observes the auto commit txn.
func (m *ScanMetrics) ObserveAutoCommitTxn() {
m.txnTotal.WithLabelValues("autocommit").Inc()
}
// ObserveTxn observes the txn.
func (m *ScanMetrics) ObserveTxn(state message.TxnState) {
m.txnTotal.WithLabelValues(state.String()).Inc()
}
// ObserveErrorTxn observes the error txn.
func (m *ScanMetrics) ObserveErrorTxn() {
m.txnTotal.WithLabelValues("error").Inc()
}
// ObserveExpiredTxn observes the expired txn.
func (m *ScanMetrics) ObserveExpiredTxn() {
m.txnTotal.WithLabelValues("expired").Inc()
}
// NewScannerMetrics creates a new scanner metrics.
func (m *ScanMetrics) NewScannerMetrics() *ScannerMetrics {
return &ScannerMetrics{
ScanMetrics: m,
previousTxnBufSize: 0,
previousTimeTickBufSize: 0,
previousPendingQueueSize: 0,
}
}
// Close closes the metrics.
func (m *ScanMetrics) Close() {
metrics.WALScanMessageBytes.Delete(m.constLabel)
metrics.WALScanPassMessageBytes.DeletePartialMatch(m.constLabel)
metrics.WALScanMessageTotal.DeletePartialMatch(m.constLabel)
metrics.WALScanPassMessageTotal.DeletePartialMatch(m.constLabel)
metrics.WALScanTimeTickViolationMessageTotal.DeletePartialMatch(m.constLabel)
metrics.WALScanTxnTotal.DeletePartialMatch(m.constLabel)
metrics.WALScannerTimeTickBufBytes.Delete(m.constLabel)
metrics.WALScannerTxnBufBytes.Delete(m.constLabel)
metrics.WALScannerPendingQueueBytes.Delete(m.constLabel)
}
type ScannerMetrics struct {
*ScanMetrics
previousTxnBufSize int
previousTimeTickBufSize int
previousPendingQueueSize int
}
func (m *ScannerMetrics) UpdatePendingQueueSize(size int) {
diff := size - m.previousPendingQueueSize
m.pendingQueueSize.Add(float64(diff))
m.previousPendingQueueSize = size
}
func (m *ScannerMetrics) UpdateTxnBufSize(size int) {
diff := size - m.previousTimeTickBufSize
m.timeTickBufSize.Add(float64(diff))
m.previousTimeTickBufSize = size
}
func (m *ScannerMetrics) UpdateTimeTickBufSize(size int) {
diff := size - m.previousTxnBufSize
m.txnBufSize.Add(float64(diff))
m.previousTxnBufSize = size
}
func (m *ScannerMetrics) Close() {
m.UpdatePendingQueueSize(0)
m.UpdateTimeTickBufSize(0)
m.UpdateTimeTickBufSize(0)
}