mirror of https://github.com/milvus-io/milvus.git
134 lines
4.5 KiB
Go
134 lines
4.5 KiB
Go
package metricsutil
|
|
|
|
import (
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"github.com/milvus-io/milvus/pkg/metrics"
|
|
"github.com/milvus-io/milvus/pkg/streaming/util/message"
|
|
"github.com/milvus-io/milvus/pkg/streaming/util/types"
|
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
|
)
|
|
|
|
func NewScanMetrics(pchannel types.PChannelInfo) *ScanMetrics {
|
|
constLabel := prometheus.Labels{
|
|
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
|
|
metrics.WALChannelLabelName: pchannel.Name,
|
|
}
|
|
return &ScanMetrics{
|
|
constLabel: constLabel,
|
|
messageBytes: metrics.WALScanMessageBytes.With(constLabel),
|
|
passMessageBytes: metrics.WALScanPassMessageBytes.With(constLabel),
|
|
messageTotal: metrics.WALScanMessageTotal.MustCurryWith(constLabel),
|
|
passMessageTotal: metrics.WALScanPassMessageTotal.MustCurryWith(constLabel),
|
|
timeTickViolationTotal: metrics.WALScanTimeTickViolationMessageTotal.MustCurryWith(constLabel),
|
|
txnTotal: metrics.WALScanTxnTotal.MustCurryWith(constLabel),
|
|
pendingQueueSize: metrics.WALScannerPendingQueueBytes.With(constLabel),
|
|
timeTickBufSize: metrics.WALScannerTimeTickBufBytes.With(constLabel),
|
|
txnBufSize: metrics.WALScannerTxnBufBytes.With(constLabel),
|
|
}
|
|
}
|
|
|
|
type ScanMetrics struct {
|
|
constLabel prometheus.Labels
|
|
messageBytes prometheus.Observer
|
|
passMessageBytes prometheus.Observer
|
|
messageTotal *prometheus.CounterVec
|
|
passMessageTotal *prometheus.CounterVec
|
|
timeTickViolationTotal *prometheus.CounterVec
|
|
txnTotal *prometheus.CounterVec
|
|
timeTickBufSize prometheus.Gauge
|
|
txnBufSize prometheus.Gauge
|
|
pendingQueueSize prometheus.Gauge
|
|
}
|
|
|
|
// ObserveMessage observes the message.
|
|
func (m *ScanMetrics) ObserveMessage(msgType message.MessageType, bytes int) {
|
|
m.messageBytes.Observe(float64(bytes))
|
|
m.messageTotal.WithLabelValues(msgType.String()).Inc()
|
|
}
|
|
|
|
// ObserveFilteredMessage observes the filtered message.
|
|
func (m *ScanMetrics) ObserveFilteredMessage(msgType message.MessageType, bytes int) {
|
|
m.passMessageBytes.Observe(float64(bytes))
|
|
m.passMessageTotal.WithLabelValues(msgType.String()).Inc()
|
|
}
|
|
|
|
// ObserveTimeTickViolation observes the time tick violation.
|
|
func (m *ScanMetrics) ObserveTimeTickViolation(msgType message.MessageType) {
|
|
m.timeTickViolationTotal.WithLabelValues(msgType.String()).Inc()
|
|
}
|
|
|
|
// ObserveAutoCommitTxn observes the auto commit txn.
|
|
func (m *ScanMetrics) ObserveAutoCommitTxn() {
|
|
m.txnTotal.WithLabelValues("autocommit").Inc()
|
|
}
|
|
|
|
// ObserveTxn observes the txn.
|
|
func (m *ScanMetrics) ObserveTxn(state message.TxnState) {
|
|
m.txnTotal.WithLabelValues(state.String()).Inc()
|
|
}
|
|
|
|
// ObserveErrorTxn observes the error txn.
|
|
func (m *ScanMetrics) ObserveErrorTxn() {
|
|
m.txnTotal.WithLabelValues("error").Inc()
|
|
}
|
|
|
|
// ObserveExpiredTxn observes the expired txn.
|
|
func (m *ScanMetrics) ObserveExpiredTxn() {
|
|
m.txnTotal.WithLabelValues("expired").Inc()
|
|
}
|
|
|
|
// NewScannerMetrics creates a new scanner metrics.
|
|
func (m *ScanMetrics) NewScannerMetrics() *ScannerMetrics {
|
|
return &ScannerMetrics{
|
|
ScanMetrics: m,
|
|
previousTxnBufSize: 0,
|
|
previousTimeTickBufSize: 0,
|
|
previousPendingQueueSize: 0,
|
|
}
|
|
}
|
|
|
|
// Close closes the metrics.
|
|
func (m *ScanMetrics) Close() {
|
|
metrics.WALScanMessageBytes.Delete(m.constLabel)
|
|
metrics.WALScanPassMessageBytes.DeletePartialMatch(m.constLabel)
|
|
metrics.WALScanMessageTotal.DeletePartialMatch(m.constLabel)
|
|
metrics.WALScanPassMessageTotal.DeletePartialMatch(m.constLabel)
|
|
metrics.WALScanTimeTickViolationMessageTotal.DeletePartialMatch(m.constLabel)
|
|
metrics.WALScanTxnTotal.DeletePartialMatch(m.constLabel)
|
|
metrics.WALScannerTimeTickBufBytes.Delete(m.constLabel)
|
|
metrics.WALScannerTxnBufBytes.Delete(m.constLabel)
|
|
metrics.WALScannerPendingQueueBytes.Delete(m.constLabel)
|
|
}
|
|
|
|
type ScannerMetrics struct {
|
|
*ScanMetrics
|
|
previousTxnBufSize int
|
|
previousTimeTickBufSize int
|
|
previousPendingQueueSize int
|
|
}
|
|
|
|
func (m *ScannerMetrics) UpdatePendingQueueSize(size int) {
|
|
diff := size - m.previousPendingQueueSize
|
|
m.pendingQueueSize.Add(float64(diff))
|
|
m.previousPendingQueueSize = size
|
|
}
|
|
|
|
func (m *ScannerMetrics) UpdateTxnBufSize(size int) {
|
|
diff := size - m.previousTimeTickBufSize
|
|
m.timeTickBufSize.Add(float64(diff))
|
|
m.previousTimeTickBufSize = size
|
|
}
|
|
|
|
func (m *ScannerMetrics) UpdateTimeTickBufSize(size int) {
|
|
diff := size - m.previousTxnBufSize
|
|
m.txnBufSize.Add(float64(diff))
|
|
m.previousTxnBufSize = size
|
|
}
|
|
|
|
func (m *ScannerMetrics) Close() {
|
|
m.UpdatePendingQueueSize(0)
|
|
m.UpdateTimeTickBufSize(0)
|
|
m.UpdateTimeTickBufSize(0)
|
|
}
|