influxdb/replications/metrics/replications_metrics.go

123 lines
5.1 KiB
Go

package metrics
import (
"strconv"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/prometheus/client_golang/prometheus"
)
type ReplicationsMetrics struct {
TotalPointsQueued *prometheus.CounterVec
TotalBytesQueued *prometheus.CounterVec
CurrentBytesQueued *prometheus.GaugeVec
RemoteWriteErrors *prometheus.CounterVec
RemoteWriteBytesSent *prometheus.CounterVec
RemoteWriteBytesDropped *prometheus.CounterVec
PointsFailedToQueue *prometheus.CounterVec
BytesFailedToQueue *prometheus.CounterVec
}
func NewReplicationsMetrics() *ReplicationsMetrics {
const namespace = "replications"
const subsystem = "queue"
return &ReplicationsMetrics{
TotalPointsQueued: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "total_points_queued",
Help: "Sum of all points that have been successfully added to the replication stream queue",
}, []string{"replicationID"}),
TotalBytesQueued: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "total_bytes_queued",
Help: "Sum of all bytes that have been successfully added to the replication stream queue",
}, []string{"replicationID"}),
CurrentBytesQueued: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "current_bytes_queued",
Help: "Current number of bytes in the replication stream queue remaining to be processed",
}, []string{"replicationID"}),
RemoteWriteErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "remote_write_errors",
Help: "Error codes returned from attempted remote writes",
}, []string{"replicationID", "code"}),
RemoteWriteBytesSent: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "remote_write_bytes_sent",
Help: "Bytes of data successfully sent to the remote by the replication stream",
}, []string{"replicationID"}),
RemoteWriteBytesDropped: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "remote_write_bytes_dropped",
Help: "Bytes of data dropped due to remote write failures",
}, []string{"replicationID"}),
PointsFailedToQueue: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "points_failed_to_queue",
Help: "Sum of all points that could not be added to the local replication queue",
}, []string{"replicationID"}),
BytesFailedToQueue: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "bytes_failed_to_queue",
Help: "Sum of all bytes that could not be added to the local replication queue",
}, []string{"replicationID"}),
}
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (rm *ReplicationsMetrics) PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
rm.TotalPointsQueued,
rm.TotalBytesQueued,
rm.CurrentBytesQueued,
rm.RemoteWriteErrors,
rm.RemoteWriteBytesSent,
rm.RemoteWriteBytesDropped,
rm.PointsFailedToQueue,
rm.BytesFailedToQueue,
}
}
// EnqueueData updates the metrics when adding new data to a replication queue.
func (rm *ReplicationsMetrics) EnqueueData(replicationID platform.ID, numBytes, numPoints int, queueSize int64) {
rm.TotalPointsQueued.WithLabelValues(replicationID.String()).Add(float64(numPoints))
rm.TotalBytesQueued.WithLabelValues(replicationID.String()).Add(float64(numBytes))
rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSize))
}
// Dequeue updates the metrics when data has been removed from the queue.
func (rm *ReplicationsMetrics) Dequeue(replicationID platform.ID, queueSize int64) {
rm.CurrentBytesQueued.WithLabelValues(replicationID.String()).Set(float64(queueSize))
}
// EnqueueError updates the metrics when data fails to be added to the replication queue.
func (rm *ReplicationsMetrics) EnqueueError(replicationID platform.ID, numBytes, numPoints int) {
rm.PointsFailedToQueue.WithLabelValues(replicationID.String()).Add(float64(numPoints))
rm.BytesFailedToQueue.WithLabelValues(replicationID.String()).Add(float64(numBytes))
}
// RemoteWriteError increments the error code counter for the replication.
func (rm *ReplicationsMetrics) RemoteWriteError(replicationID platform.ID, errorCode int) {
rm.RemoteWriteErrors.WithLabelValues(replicationID.String(), strconv.Itoa(errorCode)).Inc()
}
// RemoteWriteSent increases the total count of bytes sent following a successful remote write
func (rm *ReplicationsMetrics) RemoteWriteSent(replicationID platform.ID, bytes int) {
rm.RemoteWriteBytesSent.WithLabelValues(replicationID.String()).Add(float64(bytes))
}
// RemoteWriteDropped increases the total count of bytes dropped when data is dropped
func (rm *ReplicationsMetrics) RemoteWriteDropped(replicationID platform.ID, bytes int) {
rm.RemoteWriteBytesDropped.WithLabelValues(replicationID.String()).Add(float64(bytes))
}